-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize implementation of getAggregateRawMetrics in core-tools #1468
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ import com.nvidia.spark.rapids.tool.profiling._ | |
|
||
import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils} | ||
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo | ||
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef} | ||
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef} | ||
|
||
/** | ||
* Does analysis on the DataFrames from object of AppBase. | ||
|
@@ -85,10 +85,10 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |
} else { | ||
val jobAggAccumulator = new AggAccumHelper() | ||
val perJobRec = jobAggAccumulator.accumPerJob( | ||
jc.stageIds.filter(stageLevelSparkMetrics(index).contains) | ||
.map { stageId => | ||
jc.stageIds.collect { | ||
case stageId if stageLevelSparkMetrics(index).contains(stageId) => | ||
stageLevelSparkMetrics(index)(stageId) | ||
}) | ||
}) | ||
if (perJobRec.isEmptyAggregates) { | ||
None | ||
} else { | ||
|
@@ -178,10 +178,10 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |
// TODO: Should we only consider successful tasks? | ||
val sqlAggAccumulator = new AggAccumHelper() | ||
val preSqlRec = sqlAggAccumulator.accumPerSQL( | ||
stagesInSQL.filter(stageLevelSparkMetrics(index).contains) | ||
.map { stageId => | ||
stagesInSQL.collect { | ||
case stageId if stageLevelSparkMetrics(index).contains(stageId) => | ||
stageLevelSparkMetrics(index)(stageId) | ||
}) | ||
}) | ||
if (preSqlRec.isEmptyAggregates) { | ||
None | ||
} else { | ||
|
@@ -322,20 +322,21 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |
app.asInstanceOf[ApplicationInfo].planMetricProcessor | ||
} | ||
val zeroAccumProfileResults = | ||
AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L) | ||
|
||
AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L) | ||
val emptyNodeNames = Seq.empty[String] | ||
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults] | ||
// TODO: this has stage attempts. we should handle different attempts | ||
app.stageManager.getAllStages.map { sm => | ||
// TODO: Should we only consider successful tasks? | ||
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId, | ||
sm.stageInfo.attemptNumber()) | ||
// count duplicate task attempts | ||
val numTasks = tasksInStage.size | ||
val nodeNames = sqlAnalyzer.stageToNodeNames. | ||
getOrElse(sm.stageInfo.stageId, Seq.empty[String]) | ||
val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics. | ||
getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]). | ||
withDefaultValue(zeroAccumProfileResults) | ||
val nodeNames = sqlAnalyzer.stageToNodeNames.getOrElse(sm.stageInfo.stageId, emptyNodeNames) | ||
val diagnosticMetricsMap = | ||
sqlAnalyzer.stageToDiagnosticMetrics | ||
.getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reformated the code because it was not easy to read that |
||
.withDefaultValue(zeroAccumProfileResults) | ||
val srTotalBytesMetrics = | ||
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead)) | ||
|
||
|
@@ -450,8 +451,6 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |
perStageRec.srTotalBytesReadSum, | ||
perStageRec.swBytesWrittenSum, | ||
perStageRec.swRecordsWrittenSum, | ||
// Leave this timeUnit in NanoSeconds so that it will be more accurate when we take | ||
// aggregates on higher levels (i.e., SQL/Job) | ||
perStageRec.swWriteTimeSum) | ||
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow) | ||
} | ||
|
@@ -477,12 +476,4 @@ object AppSparkMetricsAnalyzer { | |
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum) | ||
} | ||
} | ||
|
||
def maxWithEmptyHandling(arr: Iterable[Long]): Long = { | ||
if (arr.isEmpty) { | ||
0L | ||
} else { | ||
arr.max | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,11 +26,6 @@ import org.apache.spark.sql.rapids.tool.store.TaskModel | |
* a parellel processor can be used to split the iterables without changing the caller side. | ||
*/ | ||
class AggAccumHelper { | ||
private def initializeRecord(rec: TaskMetricsAccumRec, iterable: Iterable[Any]): Unit = { | ||
if (iterable.isEmpty) { // Reset aggregate fields for empty collections | ||
rec.resetFields() | ||
} | ||
} | ||
|
||
private def accumCachedRecords[R <: TaskMetricsAccumRec]( | ||
stageRecords: Iterable[StageAggTaskMetricsProfileResult], | ||
|
@@ -45,22 +40,19 @@ class AggAccumHelper { | |
|
||
def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = { | ||
val resRec = createStageAccumRecord() | ||
initializeRecord(resRec, taskRecords) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was also wondering the need to |
||
taskRecords.foreach(resRec.addRecord) | ||
resRec.finalizeAggregation() | ||
resRec | ||
} | ||
|
||
def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = { | ||
val resRec = SQLAggAccum() | ||
initializeRecord(resRec, stageRecords) | ||
accumCachedRecords(stageRecords, resRec) | ||
resRec | ||
} | ||
|
||
def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = { | ||
val resRec = JobAggAccum() | ||
initializeRecord(resRec, stageRecords) | ||
accumCachedRecords(stageRecords, resRec) | ||
resRec | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,12 +63,17 @@ class TaskMetricsAccumRec { | |
*/ | ||
def isEmptyAggregates: Boolean = numTasks == 0 | ||
|
||
/** | ||
* Reset all fields to 0. This is used to reset the fields when the Task iterator is empty. | ||
* When the iterator is empty, then fields such as "max" should be reset to 0. | ||
*/ | ||
def resetFields(): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Can we add a comment on why do we need to reset fields here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! Also refactored the code to do that within the class which is better OOP |
||
durationMax = 0 | ||
durationMin = 0 | ||
peakExecutionMemoryMax = 0 | ||
resultSizeMax = 0 | ||
} | ||
nartal1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def addRecord(rec: TaskModel): Unit = { | ||
numTasks += 1 | ||
// SumFields | ||
|
@@ -102,6 +107,7 @@ class TaskMetricsAccumRec { | |
// Min Fields | ||
durationMin = math.min(durationMin, rec.duration) | ||
} | ||
|
||
def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = { | ||
// Sums | ||
numTasks += rec.numTasks | ||
|
@@ -143,5 +149,9 @@ class TaskMetricsAccumRec { | |
*/ | ||
def finalizeAggregation(): Unit = { | ||
durationAvg = ToolUtils.calculateAverage(durationSum, numTasks, 1) | ||
if (numTasks < 1) { | ||
// number of tasks is 0, then we should reset fields such as (max, min) to 0. | ||
resetFields() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cindyyuanjiang
N/A
"There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @amahussein!