diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java index b9c66aa..0e62675 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerIntegrationTest.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -30,6 +31,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -196,6 +198,70 @@ public void uncaughtException(Thread t, Throwable e) { Assert.assertTrue(root instanceof RecordTooLargeException, root.getMessage()); } + @Test + public void testProducerFlushAfterClose() throws Exception { + String topic = "testCloseFromProduceCallbackOnSenderThread"; + createTopic(topic, 1); + + Random random = new Random(666); + Properties extra = new Properties(); + extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1500); + extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); + extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + Properties baseProducerConfig = getProducerProperties(extra); + LiKafkaInstrumentedProducerImpl producer = new LiKafkaInstrumentedProducerImpl( + baseProducerConfig, + Collections.emptyMap(), + (baseConfig, overrideConfig) -> new LiKafkaProducerImpl(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)), + () -> "bogus", + 10 + ); + byte[] key = new byte[500]; + byte[] value = new byte[500]; + random.nextBytes(key); + random.nextBytes(value); + ProducerRecord record = new ProducerRecord<>(topic, key, value); + + producer.send(record); + producer.close(Duration.ofSeconds(0)); + producer.flush(0, TimeUnit.MILLISECONDS); + } + + @Test + public void testProducerFlushWithTimeoutException() throws Exception { + String topic = "testCloseFromProduceCallbackOnSenderThread"; + createTopic(topic, 1); + + Random random = new Random(666); + Properties extra = new Properties(); + extra.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "" + 1500); + extra.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); + extra.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + extra.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + Properties baseProducerConfig = getProducerProperties(extra); + LiKafkaInstrumentedProducerImpl producer = new LiKafkaInstrumentedProducerImpl( + baseProducerConfig, + Collections.emptyMap(), + (baseConfig, overrideConfig) -> new LiKafkaProducerImpl(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)), + () -> "bogus", + 10 + ); + byte[] key = new byte[500]; + byte[] value = new byte[500]; + random.nextBytes(key); + random.nextBytes(value); + ProducerRecord record = new ProducerRecord<>(topic, key, value); + producer.send(record); + // kill brokers + Set brokerIds = super._brokers.keySet(); + for (int id: brokerIds) { + super.killBroker(id); + } + producer.send(record); + Assert.assertThrows(TimeoutException.class, () -> producer.flush(0, TimeUnit.MILLISECONDS)); + } + private void createTopic(String topicName, int numPartitions) throws Exception { try (AdminClient adminClient = createRawAdminClient(null)) { adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, (short) 1))).all().get(1, TimeUnit.MINUTES); diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java index c686c61..5360eb7 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java @@ -337,8 +337,6 @@ public Future send(ProducerRecord record, Callback callbac @Override public void flush() { - verifyOpen(); - delegateLock.readLock().lock(); try { delegate.flush(); @@ -355,17 +353,21 @@ public void flush() { * If the underlying producer doesn't support a bounded flush, it will invoke the {@link #flush()}. */ public void flush(long timeout, TimeUnit timeUnit) { - verifyOpen(); - delegateLock.readLock().lock(); try { boolean useSeparateThreadForFlush = false; if (boundedFlushMethod != null) { try { boundedFlushMethod.invoke(delegate, timeout, timeUnit); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new IllegalStateException("failed to invoke the bounded flush method", e); - } + } catch (IllegalAccessException illegalAccessException) { + throw new IllegalStateException("Failed to invoke the bounded flush method", illegalAccessException); + } catch (InvocationTargetException invocationTargetException) { + if (invocationTargetException.getCause() instanceof RuntimeException) { + throw (RuntimeException) invocationTargetException.getCause(); + } else { + throw new IllegalStateException("Failed to invoke the bounded flush method", invocationTargetException); + } + } } else { useSeparateThreadForFlush = true; } @@ -450,7 +452,6 @@ public void close(Duration timeout) { delegate.close(timeout.toMillis(), TimeUnit.MILLISECONDS); } } finally { - this.delegate = null; closeMdsClient(); } } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java index cceddce..b1871f9 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java @@ -423,8 +423,14 @@ public void flush(long timeout, TimeUnit timeUnit) { if (_boundedFlushMethod != null) { try { _boundedFlushMethod.invoke(_producer, timeout, timeUnit); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new IllegalStateException("failed to invoke the bounded flush method", e); + } catch (IllegalAccessException illegalAccessException) { + throw new IllegalStateException("Failed to invoke the bounded flush method", illegalAccessException); + } catch (InvocationTargetException invocationTargetException) { + if (invocationTargetException.getCause() instanceof RuntimeException) { + throw (RuntimeException) invocationTargetException.getCause(); + } else { + throw new IllegalStateException("Failed to invoke the bounded flush method", invocationTargetException); + } } } else { useSeparateThreadForFlush = true;