Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Jun 28, 2023
1 parent d2d3ee4 commit 940e90d
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName) {
log.debug("[{}] Return null for getTopic({}) since channel is closing",
requestHandler.ctx.channel(), topicName);
}
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture =
kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,6 @@ private void handleEntries(final CompletableFuture<ReadRecordsResult> future,
log.debug("Partition {} read entry completed in {} ns",
topicPartition, MathUtils.nowInNano() - startDecodingEntriesNanos);
}
log.info("Partition {} read entry completed. {} ", topicPartition, abortedTransactions);
future.complete(ReadRecordsResult
.get(decodeResult, abortedTransactions, highWatermark, lso, lastPosition, this));
}, context.getDecodeExecutor()).exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,29 +145,7 @@ void maybeTakeSnapshot(Executor executor) {
}
lastSnapshotTime = now;


// take the snapshot in this thread, that is the same thread
// that executes mutations
ProducerStateManagerSnapshot snapshot = getProducerStateManagerSnapshot();

// write to Pulsar in another thread, and also ignore errors
executor.execute(new SafeRunnable() {
@Override
public void safeRun() {
producerStateManagerSnapshotBuffer
.write(snapshot)
.whenComplete((res, error) -> {
if (error == null) {
log.info("Snapshot for {} taken at offset {} written",
topicPartition, snapshot.getOffset());
} else {
log.info("Error writing snapshot for {} taken at offset {}",
topicPartition, snapshot.getOffset(), error);
}
});
}
});

takeSnapshot(executor);
}

private ProducerStateManagerSnapshot getProducerStateManagerSnapshot() {
Expand Down

0 comments on commit 940e90d

Please sign in to comment.