Skip to content

Commit 5cf389c

Browse files
committed
Refine contribution #3827
* Update year in licence headers * Update Javadoc * Add `this` keyword where appropriate
1 parent 7b6b15d commit 5cf389c

File tree

3 files changed

+19
-10
lines changed

3 files changed

+19
-10
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/KeyValueItemWriter.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
55
* the License. You may obtain a copy of the License at
@@ -45,6 +45,12 @@ public void write(List<? extends V> items) throws Exception {
4545
}
4646
flush();
4747
}
48+
49+
/**
50+
* Flush items to the key/value store.
51+
*
52+
* @throws Exception if unable to flush items
53+
*/
4854
protected void flush() throws Exception {}
4955

5056
/**

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,19 +44,20 @@ public class KafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
4444
@Override
4545
protected void writeKeyValue(K key, T value) {
4646
if (this.delete) {
47-
listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
47+
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
4848
}
4949
else {
50-
listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
50+
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
5151
}
5252
}
53+
5354
@Override
5455
protected void flush() throws Exception{
55-
kafkaTemplate.flush();
56-
for(ListenableFuture<SendResult<K,T>> future: listenableFutures){
56+
this.kafkaTemplate.flush();
57+
for(ListenableFuture<SendResult<K,T>> future: this.listenableFutures){
5758
future.get();
5859
}
59-
listenableFutures.clear();
60+
this.listenableFutures.clear();
6061
}
6162

6263
@Override

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,9 @@
3131
import static org.junit.Assert.assertEquals;
3232
import static org.junit.Assert.fail;
3333
import static org.mockito.ArgumentMatchers.any;
34-
import static org.mockito.Mockito.*;
34+
import static org.mockito.Mockito.verify;
35+
import static org.mockito.Mockito.when;
36+
import static org.mockito.Mockito.times;
3537

3638
public class KafkaItemWriterTests {
3739

@@ -49,7 +51,7 @@ public class KafkaItemWriterTests {
4951
public void setUp() throws Exception {
5052
MockitoAnnotations.openMocks(this);
5153
when(this.kafkaTemplate.getDefaultTopic()).thenReturn("defaultTopic");
52-
when(this.kafkaTemplate.sendDefault(any(), any())).thenReturn(future);
54+
when(this.kafkaTemplate.sendDefault(any(), any())).thenReturn(this.future);
5355
this.itemKeyMapper = new KafkaItemKeyMapper();
5456
this.writer = new KafkaItemWriter<>();
5557
this.writer.setKafkaTemplate(this.kafkaTemplate);

0 commit comments

Comments
 (0)