Skip to content

Commit

Permalink
Fix Kafka flush() contract different in vanilla client v/s li-apache-…
Browse files Browse the repository at this point in the history
…kafka-clients (#192)

* Removed verifyOpen() from flush() and flush(long timeout, TimeUnit timeUnit).
* Peeled InnovationTargetException and re-throw vinilla client exception in flush().
* Added producer flush() test.
* Added testProducerFlushWithBrokerKilled.
* Fixed the comments.
* Changed exception handling and some flush test modification.
  • Loading branch information
fluffywei authored Feb 22, 2021
1 parent 31ef712 commit a9db42e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -30,6 +31,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -196,6 +198,70 @@ public void uncaughtException(Thread t, Throwable e) {
Assert.assertTrue(root instanceof RecordTooLargeException, root.getMessage());
}

@Test
public void testProducerFlushAfterClose() throws Exception {
String topic = "testCloseFromProduceCallbackOnSenderThread";
createTopic(topic, 1);

Random random = new Random(666);
Properties extra = new Properties();
extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1500);
extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Properties baseProducerConfig = getProducerProperties(extra);
LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<byte[], byte[]>(
baseProducerConfig,
Collections.emptyMap(),
(baseConfig, overrideConfig) -> new LiKafkaProducerImpl<byte[], byte[]>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
() -> "bogus",
10
);
byte[] key = new byte[500];
byte[] value = new byte[500];
random.nextBytes(key);
random.nextBytes(value);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, value);

producer.send(record);
producer.close(Duration.ofSeconds(0));
producer.flush(0, TimeUnit.MILLISECONDS);
}

@Test
public void testProducerFlushWithTimeoutException() throws Exception {
String topic = "testCloseFromProduceCallbackOnSenderThread";
createTopic(topic, 1);

Random random = new Random(666);
Properties extra = new Properties();
extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1500);
extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Properties baseProducerConfig = getProducerProperties(extra);
LiKafkaInstrumentedProducerImpl<byte[], byte[]> producer = new LiKafkaInstrumentedProducerImpl<byte[], byte[]>(
baseProducerConfig,
Collections.emptyMap(),
(baseConfig, overrideConfig) -> new LiKafkaProducerImpl<byte[], byte[]>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
() -> "bogus",
10
);
byte[] key = new byte[500];
byte[] value = new byte[500];
random.nextBytes(key);
random.nextBytes(value);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
// kill brokers
Set<Integer> brokerIds = super._brokers.keySet();
for (int id: brokerIds) {
super.killBroker(id);
}
producer.send(record);
Assert.assertThrows(TimeoutException.class, () -> producer.flush(0, TimeUnit.MILLISECONDS));
}

private void createTopic(String topicName, int numPartitions) throws Exception {
try (AdminClient adminClient = createRawAdminClient(null)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, (short) 1))).all().get(1, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,6 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac

@Override
public void flush() {
verifyOpen();

delegateLock.readLock().lock();
try {
delegate.flush();
Expand All @@ -355,17 +353,21 @@ public void flush() {
* If the underlying producer doesn't support a bounded flush, it will invoke the {@link #flush()}.
*/
public void flush(long timeout, TimeUnit timeUnit) {
verifyOpen();

delegateLock.readLock().lock();
try {
boolean useSeparateThreadForFlush = false;
if (boundedFlushMethod != null) {
try {
boundedFlushMethod.invoke(delegate, timeout, timeUnit);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException("failed to invoke the bounded flush method", e);
}
} catch (IllegalAccessException illegalAccessException) {
throw new IllegalStateException("Failed to invoke the bounded flush method", illegalAccessException);
} catch (InvocationTargetException invocationTargetException) {
if (invocationTargetException.getCause() instanceof RuntimeException) {
throw (RuntimeException) invocationTargetException.getCause();
} else {
throw new IllegalStateException("Failed to invoke the bounded flush method", invocationTargetException);
}
}
} else {
useSeparateThreadForFlush = true;
}
Expand Down Expand Up @@ -450,7 +452,6 @@ public void close(Duration timeout) {
delegate.close(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
} finally {
this.delegate = null;
closeMdsClient();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,14 @@ public void flush(long timeout, TimeUnit timeUnit) {
if (_boundedFlushMethod != null) {
try {
_boundedFlushMethod.invoke(_producer, timeout, timeUnit);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException("failed to invoke the bounded flush method", e);
} catch (IllegalAccessException illegalAccessException) {
throw new IllegalStateException("Failed to invoke the bounded flush method", illegalAccessException);
} catch (InvocationTargetException invocationTargetException) {
if (invocationTargetException.getCause() instanceof RuntimeException) {
throw (RuntimeException) invocationTargetException.getCause();
} else {
throw new IllegalStateException("Failed to invoke the bounded flush method", invocationTargetException);
}
}
} else {
useSeparateThreadForFlush = true;
Expand Down

0 comments on commit a9db42e

Please sign in to comment.