From 99eeca56d63cad8000f30d0ff2dff53d92107d5b Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Thu, 4 Jan 2024 09:18:04 +0800 Subject: [PATCH] Fix unknown exception caused by thread safety issue (#2033) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation Both the response thread and timeout detection thread will access PendingProduceCallback This problem will easily occur when the sending latency is close to the timeout threshold. Step-1: response thread and timeout detection thread access 1 at the same time, none of them meet the conditions for return Step-2: timeout detection thread execute to 2, and set responseMap = null Setp-3: response thread execute to 3, cause NPE image ### Modifications Add synchronized --- .../pulsar/handlers/kop/storage/ReplicaManager.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index bc300f80f5..3afa9cce24 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -108,7 +108,7 @@ private static class PendingProduceCallback implements Runnable { final CompletableFuture> completableFuture; Map entriesPerPartition; @Override - public void run() { + public synchronized void run() { topicPartitionNum.set(0); if (completableFuture.isDone()) { // It may be triggered again in DelayedProduceAndFetch @@ -116,9 +116,10 @@ public void run() { } // add the topicPartition with timeout error if it's not existed in responseMap entriesPerPartition.keySet().forEach(topicPartition -> { - if (!responseMap.containsKey(topicPartition)) { + ProduceResponse.PartitionResponse response = responseMap.putIfAbsent(topicPartition, + new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT)); + if (response == null) { log.error("Adding dummy REQUEST_TIMED_OUT to produce response for {}", topicPartition); - responseMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT)); } }); if (log.isDebugEnabled()) {