Skip to content

Commit 4d8fbe8

Browse files
Merge pull request #258 from zigarn/issue-257
Fix thread-safety in KafkaRecordTracker
2 parents eb6db72 + 1bac3b9 commit 4d8fbe8

File tree

1 file changed

+8
-17
lines changed

1 file changed

+8
-17
lines changed

src/main/java/com/splunk/kafka/connect/KafkaRecordTracker.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@
2424
import java.util.*;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.ConcurrentLinkedQueue;
27+
import java.util.concurrent.ConcurrentMap;
28+
import java.util.concurrent.ConcurrentNavigableMap;
29+
import java.util.concurrent.ConcurrentSkipListMap;
2730
import java.util.concurrent.atomic.AtomicLong;
2831

2932
import org.slf4j.Logger;
3033
import org.slf4j.LoggerFactory;
3134

3235
final class KafkaRecordTracker {
3336
private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class);
34-
private ConcurrentHashMap<TopicPartition, TreeMap<Long, EventBatch>> all; // TopicPartition + Long offset represents the SinkRecord
37+
private ConcurrentMap<TopicPartition, ConcurrentNavigableMap<Long, EventBatch>> all; // TopicPartition + Long offset represents the SinkRecord
3538
private AtomicLong total;
3639
private ConcurrentLinkedQueue<EventBatch> failed;
3740
private volatile Map<TopicPartition, OffsetAndMetadata> offsets;
@@ -57,7 +60,7 @@ public void removeAckedEventBatch(final EventBatch batch) {
5760
final SinkRecord record = (SinkRecord) event.getTied();
5861
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
5962
//log.debug("Processing topic {} partition {}", record.topic(), record.kafkaPartition());
60-
TreeMap<Long, EventBatch> tpRecords = all.get(tp);
63+
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.get(tp);
6164
if (tpRecords == null) {
6265
log.error("KafkaRecordTracker removing a batch in an unknown partition {} {} {}", record.topic(), record.kafkaPartition(), record.kafkaOffset());
6366
return;
@@ -76,11 +79,7 @@ public void removeAckedEventBatch(final EventBatch batch) {
7679
}
7780
}
7881
if (offset >= 0) {
79-
if (offsets.containsKey(tp)) {
80-
offsets.replace(tp, new OffsetAndMetadata(offset + 1));
81-
} else {
82-
offsets.put(tp, new OffsetAndMetadata(offset + 1));
83-
}
82+
offsets.put(tp, new OffsetAndMetadata(offset + 1));
8483
}
8584
}
8685
}
@@ -98,16 +97,8 @@ public void addEventBatch(final EventBatch batch) {
9897
if (event.getTied() instanceof SinkRecord) {
9998
final SinkRecord record = (SinkRecord) event.getTied();
10099
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
101-
TreeMap<Long, EventBatch> tpRecords = all.get(tp);
102-
if (tpRecords == null) {
103-
tpRecords = new TreeMap<>();
104-
all.put(tp, tpRecords);
105-
}
106-
107-
if (!tpRecords.containsKey(record.kafkaOffset())) {
108-
tpRecords.put(record.kafkaOffset(), batch);
109-
total.incrementAndGet();
110-
}
100+
ConcurrentNavigableMap<Long, EventBatch> tpRecords = all.computeIfAbsent(tp, k -> new ConcurrentSkipListMap<>());
101+
tpRecords.computeIfAbsent(record.kafkaOffset(), k -> { total.incrementAndGet(); return batch; });
111102
}
112103
}
113104
}

0 commit comments

Comments
 (0)