diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 3933375512baf..2c4c3dd3de5a7 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -26,7 +26,6 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -43,7 +42,6 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.functions.api.KVRecord; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +49,7 @@ /** * Simple Kafka Source to transfer messages from a Kafka topic. */ -public abstract class KafkaAbstractSource extends PushSource { +public abstract class KafkaAbstractSource extends KafkaPushSource { private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class); @@ -126,7 +124,6 @@ public void open(Map config, SourceContext sourceContext) throws throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex); } this.start(); - running = true; } protected Properties beforeCreateConsumer(Properties props) { @@ -151,47 +148,36 @@ public void close() throws InterruptedException { @SuppressWarnings("unchecked") public void start() { + LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); + consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); runnerThread = new Thread(() -> { - LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic()); - consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); LOG.info("Kafka source started."); while (running) { - ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); - CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; - int index = 0; - for (ConsumerRecord consumerRecord : consumerRecords) { - KafkaRecord record = buildRecord(consumerRecord); - if (LOG.isDebugEnabled()) { - LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); + try { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); + CompletableFuture[] futures = new CompletableFuture[consumerRecords.count()]; + int index = 0; + for (ConsumerRecord consumerRecord : consumerRecords) { + KafkaRecord record = buildRecord(consumerRecord); + if (LOG.isDebugEnabled()) { + LOG.debug("Write record {} {} {}", record.getKey(), record.getValue(), record.getSchema()); + } + consume(record); + futures[index] = record.getCompletableFuture(); + index++; } - consume(record); - futures[index] = record.getCompletableFuture(); - index++; - } - if (!kafkaSourceConfig.isAutoCommitEnabled()) { - try { + if (!kafkaSourceConfig.isAutoCommitEnabled()) { CompletableFuture.allOf(futures).get(); consumer.commitSync(); - } catch (InterruptedException ex) { - break; - } catch (ExecutionException ex) { - LOG.error("Error while processing records", ex); - break; } + } catch (Exception e) { + LOG.error("Error while processing records", e); + notifyError(e); + break; } } }); - runnerThread.setUncaughtExceptionHandler( - (t, e) -> { - new Thread(() -> { - LOG.error("[{}] Error while consuming records", t.getName(), e); - try { - this.close(); - } catch (Exception ex) { - LOG.error("[{}] Close kafka source error", t.getName(), e); - } - }, "Kafka Source Close Task Thread").start(); - }); + running = true; runnerThread.setName("Kafka Source Thread"); runnerThread.start(); } diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java new file mode 100644 index 0000000000000..d0d92daee8d5b --- /dev/null +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaPushSource.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kafka; + +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Source; + +/** + * Kafka Push Source. + * To maintain compatibility, we can't pick the PIP-281: https://github.com/apache/pulsar/pull/20807 + * cherry-pick to the historical version, so the class is implemented in the kafka connector. + */ +public abstract class KafkaPushSource implements Source { + + private static class NullRecord implements Record { + @Override + public Object getValue() { + return null; + } + } + + private static class ErrorNotifierRecord implements Record { + private Exception e; + public ErrorNotifierRecord(Exception e) { + this.e = e; + } + @Override + public Object getValue() { + return null; + } + + public Exception getException() { + return e; + } + } + + private LinkedBlockingQueue> queue; + private static final int DEFAULT_QUEUE_LENGTH = 1000; + private final NullRecord nullRecord = new NullRecord(); + + public KafkaPushSource() { + this.queue = new LinkedBlockingQueue<>(this.getQueueLength()); + } + + @Override + public Record read() throws Exception { + Record record = queue.take(); + if (record instanceof ErrorNotifierRecord) { + throw ((ErrorNotifierRecord) record).getException(); + } + if (record instanceof NullRecord) { + return null; + } else { + return record; + } + } + + /** + * Send this message to be written to Pulsar. + * Pass null if you you are done with this task + * @param record next message from source which should be sent to a Pulsar topic + */ + public void consume(Record record) { + try { + if (record != null) { + queue.put(record); + } else { + queue.put(nullRecord); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Get length of the queue that records are push onto. + * Users can override this method to customize the queue length + * @return queue length + */ + public int getQueueLength() { + return DEFAULT_QUEUE_LENGTH; + } + + /** + * Allows the source to notify errors asynchronously. + * @param ex + */ + public void notifyError(Exception ex) { + consume(new ErrorNotifierRecord(ex)); + } +} diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index ee573cacd7719..54e7ac8f8354b 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; +import java.time.Duration; import java.util.Collection; import java.lang.reflect.Field; import org.apache.kafka.clients.consumer.Consumer; @@ -31,7 +32,6 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.kafka.KafkaAbstractSource; import org.apache.pulsar.io.kafka.KafkaSourceConfig; -import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -158,26 +158,47 @@ public final void loadFromSaslYamlFileTest() throws IOException { assertEquals(config.getSslTruststorePassword(), "cert_pwd"); } - @Test - public final void closeConnectorWhenUnexpectedExceptionThrownTest() throws Exception { + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Subscribe exception") + public final void throwExceptionBySubscribe() throws Exception { KafkaAbstractSource source = new DummySource(); + + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); + Consumer consumer = mock(Consumer.class); - Mockito.doThrow(new RuntimeException("Uncaught exception")).when(consumer) + Mockito.doThrow(new RuntimeException("Subscribe exception")).when(consumer) .subscribe(Mockito.any(Collection.class)); Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); consumerField.setAccessible(true); consumerField.set(source, consumer); - + // will throw RuntimeException. source.start(); + } + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Pool exception") + public final void throwExceptionByPoll() throws Exception { + KafkaAbstractSource source = new DummySource(); - Field runningField = KafkaAbstractSource.class.getDeclaredField("running"); - runningField.setAccessible(true); + KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig(); + kafkaSourceConfig.setTopic("test-topic"); + Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig"); + kafkaSourceConfigField.setAccessible(true); + kafkaSourceConfigField.set(source, kafkaSourceConfig); - Awaitility.await().untilAsserted(() -> { - Assert.assertFalse((boolean) runningField.get(source)); - Assert.assertNull(consumerField.get(source)); - }); + Consumer consumer = mock(Consumer.class); + Mockito.doThrow(new RuntimeException("Pool exception")).when(consumer) + .poll(Mockito.any(Duration.class)); + + Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer"); + consumerField.setAccessible(true); + consumerField.set(source, consumer); + source.start(); + // will throw RuntimeException. + source.read(); } private File getFile(String name) {