-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix inconsistent shuffle write time sum results in Profiler output #1450
Conversation
… ms only in output Signed-off-by: cindyyuanjiang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @cindyyuanjiang for the fix!
Nit: It would be nice to include the before and after values in the description. I understand that we can confirm the fix from the expected_files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang for this change.
@amahussein Unrelated but should we have similar approach for executorCpuTime
and executorDeserializeCpuTime
?
Thanks @nartal1! Updated the before/after values in PR description. |
Thanks @parthosa! Agree we should discuss the requirements for |
Thanks @parthosa. Yes, it would have been better to fix the inconsistency for other metrics within the is very PR since the change is not big compared to the overhead we would have to go through filing another bug then dealing with the a new PR. |
@cindyyuanjiang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation is still not accurate. because we need to convert the units after all the tasks are aggregated on each level.
@@ -438,7 +438,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { | |||
val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory) | |||
val shuffleWriteTime = tasksInStage.map(_.sw_writeTime) | |||
(AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues), | |||
shuffleWriteTime.sum) | |||
TimeUnit.NANOSECONDS.toMillis(shuffleWriteTime.sum)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still does not fix the problem because the conversion is done on the stage-level.
The correct way, is to convert after the metrics are aggregated on each level.
For example, perStage/perSql/perJob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The per SQL and per job results are computed based on cached per stage results. Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct!
But when we are aggregating perSql, this PR is actually aggregating the stages per SQL after the time is converted to milliseconds.
If we want to be more accurate, then the cached-per-stage-results should still be in nano-seconds; then per-sql value is the sum in nano-seconds; and finally it gets converted to milliseonds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understood, thanks @amahussein! I will address this now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. We will keep the current implementation to avoid potential overflow if we aggregate at SQL/job level.
Signed-off-by: cindyyuanjiang <[email protected]>
8e03d94
Applied same approach for |
Signed-off-by: cindyyuanjiang <[email protected]>
} | ||
|
||
override def convertToCSVSeq: Seq[String] = { | ||
Seq(appIndex.toString, StringUtils.reformatCSVString(appId), rootsqlID.getOrElse("").toString, | ||
sqlID.toString, durStr, containsDataset.toString, appDurStr, | ||
StringUtils.reformatCSVString(potentialStr), execCpuTimePercent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
@@ -950,14 +992,27 @@ case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String, | |||
} | |||
|
|||
override def convertToSeq: Seq[String] = { | |||
Seq(appIndex.toString, rootsqlID.getOrElse("").toString, appId, sqlID.toString, durStr, | |||
containsDataset.toString, appDurStr, potentialStr, execCpuTimePercent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
"resultSerializationTime_sum", "resultSize_max", "sr_fetchWaitTime_sum", | ||
"sr_localBlocksFetched_sum", "sr_localBytesRead_sum", "sr_remoteBlocksFetched_sum", | ||
"sr_remoteBytesRead_sum", "sr_remoteBytesReadToDisk_sum", "sr_totalBytesRead_sum", | ||
"sw_bytesWritten_sum", "sw_recordsWritten_sum", "sw_writeTime_sum") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
@@ -924,12 +951,27 @@ case class IOAnalysisProfileResult( | |||
} | |||
} | |||
|
|||
case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String, | |||
rootsqlID: Option[Long], sqlID: Long, duration: Option[Long], containsDataset: Boolean, | |||
appDuration: Option[Long], potentialProbs: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
executorCpuRatio: Double) extends ProfileResult { | ||
override val outputHeaders = Seq("appIndex", "App ID", "RootSqlID", "sqlID", "SQL Duration", | ||
"Contains Dataset or RDD Op", "App Duration", "Potential Problems", "Executor CPU Time Percent") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated format only for better readability.
@amahussein @parthosa @nartal1 |
It seems more like a bug. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang
minor styling issue.
core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/rapids/tool/store/TaskModel.scala
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala
Show resolved
Hide resolved
Signed-off-by: cindyyuanjiang <[email protected]>
thanks @amahussein! Filed issue: #1469 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang for this change. The discussion above about overflow concerns makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amahussein @parthosa @nartal1 Question: After we make the changes, I see
Executor CPU Time Percent
of 103.45 > 100 incore/src/test/resources/ProfilingExpectations/rapids_duration_and_cpu_expectation.csv,
do we want to limit/upper-bound this ratio to100.0
, or it is okay to have >100 percentages?It seems more like a bug.
thanks @amahussein! Filed issue: #1469
I am not sure we should fix the percentage in a followup issue. This means that we are fixing inconsistent view across 2 files and we introduce another bug.
Investigated into this. It looks more like a rounding issue than a bug to me:
WDYT? @amahussein @nartal1 @parthosa |
Thanks @cindyyuanjiang for looking into this. I think this was always a bug but we are now able to catch it due to the changes in this PR. If in the raw form they are measured in different units, I do not think we can fix this problem. I could not find a reason why spark reports runtime in ms and cpu time in ns. |
Signed-off-by: cindyyuanjiang <[email protected]>
ded7601
Signed-off-by: cindyyuanjiang <[email protected]>
@parthosa, in systems, it is almost the standard to use nanoseconds when measuring CPU time. This is mainly due to the fact that CPUs have high frequencies and using nanoseconds will be more precise analyzing efficiency and performances (CPU utilizations..etc). Thanks @cindyyuanjiang for investigating the inconsistent ratio. |
I added some changes to the fix. I found that the qualification output also has the same problem because timeUnits are converted to Milliseconds on the task level. Below is a list of some occurences where the timeunits are converted on the task level (but not limited to)
@cindyyuanjiang PTAL at the patch file and please file a followup issue to fix the same problem in the qualification output. |
@amahussein thank you for putting together the patch! LGTM. I will apply the changes. I saw in this new patch, we are converting nano-sec to milli-sec when aggregating at stage/job/SQL level. Are we okay with the potential overflow from doing this way? Follow-up issue: #1481 (also included in PR description) |
Signed-off-by: cindyyuanjiang <[email protected]>
mmm, let us re-iterate on the steps and please correct me if I am wrong:
comments on this minor refactor:
|
Thanks @amahussein! Yes, you are correct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cindyyuanjiang !
LGTME!
Fixes #1408
Changes
TaskModel
class, keep using nanoseconds for shuffle write timeThis improves shuffle write time metrics output to avoid potential precision lost from converting nanoseconds to milliseconds and then taking the sum of the converted numbers. It also separates
TaskModel
and output reporting so that we know all metrics are in their original values before output generation.Testing
Before/After Values (shuffle write time sum)
core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_jobmetricsaggmulti_expectation.csv
944 --> 1001
849 --> 901
core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_sqlmetricsaggmulti_expectation.csv
944 --> 1001
849 --> 901
core/src/test/resources/ProfilingExpectations/rapids_join_eventlog_stagemetricsaggmulti_expectation.csv
397 --> 400
505 --> 508
42 --> 93
373 --> 376
473 --> 475
3 --> 50
Follow up issue: #1481