Skip to content

Commit

Permalink
Introduce variable for jobFinishTime
Browse files Browse the repository at this point in the history
  • Loading branch information
kr565370 committed Mar 11, 2024
1 parent a607bf7 commit 91c9c58
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public Instant handleMessages(Instant startTime, Consumer<RecordsProcessedSummar
try {
propertiesReloader.reloadIfNeeded();
RecordsProcessed recordsProcessed = compactor.compact(job);
lastActiveTime = timeSupplier.get();
RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, lastActiveTime);
Instant jobFinishTime = timeSupplier.get();
RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, jobFinishTime);
summaryConsumer.accept(summary);
message.completed();
totalNumberOfMessagesProcessed++;
Expand All @@ -129,6 +129,7 @@ public Instant handleMessages(Instant startTime, Consumer<RecordsProcessedSummar
METRICS_LOGGER.info("Compaction job {}: compaction wrote {} records at {} per second", id, summary.getRecordsWritten(),
String.format("%.1f", summary.getRecordsWrittenPerSecond()));
jobStatusStore.jobFinished(job, summary, taskId);
lastActiveTime = jobFinishTime;
} catch (Exception e) {
LOGGER.error("Failed processing compaction job, putting job back on queue", e);
numConsecutiveFailures++;
Expand Down

0 comments on commit 91c9c58

Please sign in to comment.