Skip to content

Commit 7b6b15d

Browse files
jbotuckfmbenhassine
authored andcommitted
KafkaItemWriter.write should not return until items are confirmed to have been written
Issue #3773
1 parent 64e0bda commit 7b6b15d

File tree

3 files changed

+31
-5
lines changed

3 files changed

+31
-5
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/KeyValueItemWriter.java

+2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ public void write(List<? extends V> items) throws Exception {
4343
K key = itemKeyMapper.convert(item);
4444
writeKeyValue(key, item);
4545
}
46+
flush();
4647
}
48+
protected void flush() throws Exception {}
4749

4850
/**
4951
* Subclasses implement this method to write each item to key value store

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
import org.springframework.batch.item.ItemWriter;
2020
import org.springframework.batch.item.KeyValueItemWriter;
2121
import org.springframework.kafka.core.KafkaTemplate;
22+
import org.springframework.kafka.support.SendResult;
2223
import org.springframework.util.Assert;
24+
import org.springframework.util.concurrent.ListenableFuture;
25+
26+
import java.util.ArrayList;
27+
import java.util.List;
2328

2429
/**
2530
* <p>
@@ -34,15 +39,24 @@
3439
public class KafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
3540

3641
protected KafkaTemplate<K, T> kafkaTemplate;
42+
private final List<ListenableFuture<SendResult<K, T>>> listenableFutures = new ArrayList<>();
3743

3844
@Override
3945
protected void writeKeyValue(K key, T value) {
4046
if (this.delete) {
41-
this.kafkaTemplate.sendDefault(key, null);
47+
listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
4248
}
4349
else {
44-
this.kafkaTemplate.sendDefault(key, value);
50+
listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
51+
}
52+
}
53+
@Override
54+
protected void flush() throws Exception{
55+
kafkaTemplate.flush();
56+
for(ListenableFuture<SendResult<K,T>> future: listenableFutures){
57+
future.get();
4558
}
59+
listenableFutures.clear();
4660
}
4761

4862
@Override

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,31 @@
2525

2626
import org.springframework.core.convert.converter.Converter;
2727
import org.springframework.kafka.core.KafkaTemplate;
28+
import org.springframework.kafka.support.SendResult;
29+
import org.springframework.util.concurrent.ListenableFuture;
2830

2931
import static org.junit.Assert.assertEquals;
3032
import static org.junit.Assert.fail;
31-
import static org.mockito.Mockito.verify;
32-
import static org.mockito.Mockito.when;
33+
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.Mockito.*;
3335

3436
public class KafkaItemWriterTests {
3537

3638
@Mock
3739
private KafkaTemplate<String, String> kafkaTemplate;
3840

41+
@Mock
42+
private ListenableFuture<SendResult<String, String>> future;
43+
3944
private KafkaItemKeyMapper itemKeyMapper;
4045

4146
private KafkaItemWriter<String, String> writer;
4247

4348
@Before
4449
public void setUp() throws Exception {
45-
MockitoAnnotations.initMocks(this);
50+
MockitoAnnotations.openMocks(this);
4651
when(this.kafkaTemplate.getDefaultTopic()).thenReturn("defaultTopic");
52+
when(this.kafkaTemplate.sendDefault(any(), any())).thenReturn(future);
4753
this.itemKeyMapper = new KafkaItemKeyMapper();
4854
this.writer = new KafkaItemWriter<>();
4955
this.writer.setKafkaTemplate(this.kafkaTemplate);
@@ -90,6 +96,8 @@ public void testBasicWrite() throws Exception {
9096

9197
verify(this.kafkaTemplate).sendDefault(items.get(0), items.get(0));
9298
verify(this.kafkaTemplate).sendDefault(items.get(1), items.get(1));
99+
verify(this.kafkaTemplate).flush();
100+
verify(this.future, times(2)).get();
93101
}
94102

95103
@Test
@@ -101,6 +109,8 @@ public void testBasicDelete() throws Exception {
101109

102110
verify(this.kafkaTemplate).sendDefault(items.get(0), null);
103111
verify(this.kafkaTemplate).sendDefault(items.get(1), null);
112+
verify(this.kafkaTemplate).flush();
113+
verify(this.future, times(2)).get();
104114
}
105115

106116
@Test

0 commit comments

Comments
 (0)