Skip to content

Commit e176058

Browse files
committed
Add timeout when flushing items to Kafka in KafkaItemWriter
Issue #3773
1 parent 5cf389c commit e176058

File tree

4 files changed

+51
-6
lines changed

4 files changed

+51
-6
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.util.ArrayList;
2727
import java.util.List;
28+
import java.util.concurrent.TimeUnit;
2829

2930
/**
3031
* <p>
@@ -33,13 +34,15 @@
3334
* </p>
3435
*
3536
* @author Mathieu Ouellet
37+
* @author Mahmoud Ben Hassine
3638
* @since 4.2
3739
*
3840
*/
3941
public class KafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
4042

4143
protected KafkaTemplate<K, T> kafkaTemplate;
4244
private final List<ListenableFuture<SendResult<K, T>>> listenableFutures = new ArrayList<>();
45+
private long timeout = -1;
4346

4447
@Override
4548
protected void writeKeyValue(K key, T value) {
@@ -55,7 +58,12 @@ protected void writeKeyValue(K key, T value) {
5558
protected void flush() throws Exception{
5659
this.kafkaTemplate.flush();
5760
for(ListenableFuture<SendResult<K,T>> future: this.listenableFutures){
58-
future.get();
61+
if (this.timeout >= 0) {
62+
future.get(this.timeout, TimeUnit.MILLISECONDS);
63+
}
64+
else {
65+
future.get();
66+
}
5967
}
6068
this.listenableFutures.clear();
6169
}
@@ -73,4 +81,15 @@ protected void init() {
7381
public void setKafkaTemplate(KafkaTemplate<K, T> kafkaTemplate) {
7482
this.kafkaTemplate = kafkaTemplate;
7583
}
84+
85+
/**
86+
* The time limit to wait when flushing items to Kafka.
87+
*
88+
* @param timeout milliseconds to wait, defaults to -1 (no timeout).
89+
* @since 4.3.2
90+
*/
91+
public void setTimeout(long timeout) {
92+
this.timeout = timeout;
93+
}
94+
7695
}

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/builder/KafkaItemWriterBuilder.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
* A builder implementation for the {@link KafkaItemWriter}
2626
*
2727
* @author Mathieu Ouellet
28+
* @author Mahmoud Ben Hassine
2829
* @since 4.2
2930
*/
3031
public class KafkaItemWriterBuilder<K, V> {
@@ -35,6 +36,8 @@ public class KafkaItemWriterBuilder<K, V> {
3536

3637
private boolean delete;
3738

39+
private long timeout = -1;
40+
3841
/**
3942
* Establish the KafkaTemplate to be used by the KafkaItemWriter.
4043
* @param kafkaTemplate the template to be used
@@ -71,6 +74,19 @@ public KafkaItemWriterBuilder<K, V> delete(boolean delete) {
7174
return this;
7275
}
7376

77+
/**
78+
* The time limit to wait when flushing items to Kafka.
79+
*
80+
* @param timeout milliseconds to wait, defaults to -1 (no timeout).
81+
* @return The current instance of the builder.
82+
* @see KafkaItemWriter#setTimeout(long)
83+
* @since 4.3.2
84+
*/
85+
public KafkaItemWriterBuilder<K, V> timeout(long timeout) {
86+
this.timeout = timeout;
87+
return this;
88+
}
89+
7490
/**
7591
* Validates and builds a {@link KafkaItemWriter}.
7692
* @return a {@link KafkaItemWriter}
@@ -83,6 +99,7 @@ public KafkaItemWriter<K, V> build() {
8399
writer.setKafkaTemplate(this.kafkaTemplate);
84100
writer.setItemKeyMapper(this.itemKeyMapper);
85101
writer.setDelete(this.delete);
102+
writer.setTimeout(this.timeout);
86103
return writer;
87104
}
88105
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.Arrays;
1919
import java.util.List;
20+
import java.util.concurrent.TimeUnit;
2021

2122
import org.junit.Before;
2223
import org.junit.Test;
@@ -57,6 +58,7 @@ public void setUp() throws Exception {
5758
this.writer.setKafkaTemplate(this.kafkaTemplate);
5859
this.writer.setItemKeyMapper(this.itemKeyMapper);
5960
this.writer.setDelete(false);
61+
this.writer.setTimeout(10L);
6062
this.writer.afterPropertiesSet();
6163
}
6264

@@ -99,7 +101,7 @@ public void testBasicWrite() throws Exception {
99101
verify(this.kafkaTemplate).sendDefault(items.get(0), items.get(0));
100102
verify(this.kafkaTemplate).sendDefault(items.get(1), items.get(1));
101103
verify(this.kafkaTemplate).flush();
102-
verify(this.future, times(2)).get();
104+
verify(this.future, times(2)).get(10L, TimeUnit.MILLISECONDS);
103105
}
104106

105107
@Test
@@ -112,7 +114,7 @@ public void testBasicDelete() throws Exception {
112114
verify(this.kafkaTemplate).sendDefault(items.get(0), null);
113115
verify(this.kafkaTemplate).sendDefault(items.get(1), null);
114116
verify(this.kafkaTemplate).flush();
115-
verify(this.future, times(2)).get();
117+
verify(this.future, times(2)).get(10L, TimeUnit.MILLISECONDS);
116118
}
117119

118120
@Test

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/builder/KafkaItemWriterBuilderTests.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@
3333

3434
/**
3535
* @author Mathieu Ouellet
36+
* @author Mahmoud Ben Hassine
3637
*/
3738
public class KafkaItemWriterBuilderTests {
3839

@@ -70,13 +71,19 @@ public void testNullItemKeyMapper() {
7071
public void testKafkaItemWriterBuild() {
7172
// given
7273
boolean delete = true;
74+
long timeout = 10L;
7375

7476
// when
7577
KafkaItemWriter<String, String> writer = new KafkaItemWriterBuilder<String, String>()
76-
.kafkaTemplate(this.kafkaTemplate).itemKeyMapper(this.itemKeyMapper).delete(delete).build();
78+
.kafkaTemplate(this.kafkaTemplate)
79+
.itemKeyMapper(this.itemKeyMapper)
80+
.delete(delete)
81+
.timeout(timeout)
82+
.build();
7783

7884
// then
7985
assertTrue((Boolean) ReflectionTestUtils.getField(writer, "delete"));
86+
assertEquals(timeout, ReflectionTestUtils.getField(writer, "timeout"));
8087
assertEquals(this.itemKeyMapper, ReflectionTestUtils.getField(writer, "itemKeyMapper"));
8188
assertEquals(this.kafkaTemplate, ReflectionTestUtils.getField(writer, "kafkaTemplate"));
8289
}

0 commit comments

Comments
 (0)