diff --git a/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/EmbeddedKafkaBasicConsumeActionIntegrationTest.java b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/EmbeddedKafkaBasicConsumeActionIntegrationTest.java new file mode 100644 index 000000000..3b2a8629b --- /dev/null +++ b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/EmbeddedKafkaBasicConsumeActionIntegrationTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017-2023 Enedis + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.chutneytesting.action.kafka; + +import org.junit.jupiter.api.AfterAll; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaZKBroker; + +public class EmbeddedKafkaBasicConsumeActionIntegrationTest extends KafkaBasicConsumeActionIntegrationTest { + + private static EmbeddedKafkaBroker embeddedKafkaBroker; + @Override + protected String initBroker() { + embeddedKafkaBroker = new EmbeddedKafkaZKBroker(1); + embeddedKafkaBroker.afterPropertiesSet(); + return embeddedKafkaBroker.getBrokersAsString(); + } + + @AfterAll + public static void afterAll() { + producer.close(); + embeddedKafkaBroker.destroy(); + System.out.println("after all child"); + } +} diff --git a/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionIntegrationTest.java b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionIntegrationTest.java index 0641bc10d..beed610dd 100644 --- a/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionIntegrationTest.java +++ b/chutney/action-impl/src/test/java/com/chutneytesting/action/kafka/KafkaBasicConsumeActionIntegrationTest.java @@ -25,7 +25,6 @@ import static com.chutneytesting.action.spi.ActionExecutionResult.Status.Success; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; -import static org.springframework.kafka.test.utils.KafkaTestUtils.getCurrentOffset; import static org.springframework.util.MimeTypeUtils.TEXT_PLAIN_VALUE; import com.chutneytesting.action.TestLogger; @@ -39,69 +38,58 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.IntStream; +import java.util.UUID; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.test.EmbeddedKafkaZKBroker; -import org.springframework.kafka.test.utils.KafkaTestUtils; -public class KafkaBasicConsumeActionIntegrationTest { +public abstract class KafkaBasicConsumeActionIntegrationTest { - private static final String TOPIC = "topic"; - private static final String GROUP = "mygroup"; - private static String KEYSTORE_JKS; - private EmbeddedKafkaZKBroker embeddedKafkaBroker; + private final String GROUP = "mygroup"; + private String uniqueTopic; - private Producer producer; - private TestTarget.TestTargetBuilder targetBuilder; + protected static Producer producer; + private final TestTarget.TestTargetBuilder targetBuilder; private TestLogger logger; - @BeforeAll - static void beforeAll() throws URISyntaxException { - KEYSTORE_JKS = Paths.get(requireNonNull(HttpsServerStartActionTest.class.getResource("/security/server.jks")).toURI()).toAbsolutePath().toString(); + public KafkaBasicConsumeActionIntegrationTest() { + String brokerPath = initBroker(); + producer = createProducer(brokerPath); + targetBuilder = TestTarget.TestTargetBuilder.builder().withTargetId("kafka").withUrl("tcp://" + brokerPath); } + protected abstract String initBroker(); + @BeforeEach public void before() { logger = new TestLogger(); - embeddedKafkaBroker = new EmbeddedKafkaZKBroker(1, false, TOPIC); - embeddedKafkaBroker.afterPropertiesSet(); - producer = createProducer(); - targetBuilder = TestTarget.TestTargetBuilder.builder().withTargetId("kafka").withUrl("tcp://" + embeddedKafkaBroker.getBrokersAsString()); - } - - @AfterEach - void tearDown() { - producer.close(); - embeddedKafkaBroker.destroy(); + uniqueTopic = UUID.randomUUID().toString(); } @Test public void should_consume_message_from_broker_without_truststore() { // given - producer.send(new ProducerRecord<>(TOPIC, 123, "my-test-value")); + producer.send(new ProducerRecord<>(uniqueTopic, 123, "my-test-value")); Map props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase()); - Action sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "10 s", null, null, logger); + Action consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false); // when - ActionExecutionResult actionExecutionResult = sut.execute(); + ActionExecutionResult actionExecutionResult = consumeAction.execute(); // then assertThat(actionExecutionResult.status).isEqualTo(Success); @@ -110,9 +98,10 @@ public void should_consume_message_from_broker_without_truststore() { } @Test - public void consumer_from_target_with_truststore_should_reject_ssl_connection_with_broker_without_truststore_configured() { + public void consumer_from_target_with_truststore_should_reject_ssl_connection_with_broker_without_truststore_configured() throws URISyntaxException { // given - Target target = targetBuilder.withProperty("trustStore", KEYSTORE_JKS) + String keystore_jks = Paths.get(requireNonNull(HttpsServerStartActionTest.class.getResource("/security/server.jks")).toURI()).toAbsolutePath().toString(); + Target target = targetBuilder.withProperty("trustStore", keystore_jks) .withProperty("trustStorePassword", "server") .withProperty("security.protocol", "SSL") .build(); @@ -120,10 +109,10 @@ public void consumer_from_target_with_truststore_should_reject_ssl_connection_wi Map props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP); - Action sut = new KafkaBasicConsumeAction(target, TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, null,logger); + Action consumeAction = getKafkaBasicConsumeAction(target, props, false); // when - ActionExecutionResult actionExecutionResult = sut.execute(); + ActionExecutionResult actionExecutionResult = consumeAction.execute(); // then assertThat(actionExecutionResult.status).isEqualTo(Failure); @@ -137,38 +126,43 @@ public void should_reset_offset_to_the_beginning() { props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase()); - Action sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null,null, logger); + Action consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false); - producer.send(new ProducerRecord<>(TOPIC, 123, "1")); + producer.send(new ProducerRecord<>(uniqueTopic, 123, "1")); - ActionExecutionResult actionExecutionResult = sut.execute(); + ActionExecutionResult actionExecutionResult = consumeAction.execute(); assertThat(actionExecutionResult.status).isEqualTo(Success); List> body = assertActionOutputsSize(actionExecutionResult, 1); assertThat(body.get(0).get("payload")).isEqualTo("1"); // second time - sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, null,logger); - actionExecutionResult = sut.execute(); + consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false); + actionExecutionResult = consumeAction.execute(); assertThat(actionExecutionResult.status).isEqualTo(Failure); // third time with reset - sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, true,logger); - actionExecutionResult = sut.execute(); + Action consumeActionWithReset = getKafkaBasicConsumeAction(targetBuilder.build(), props, true); + actionExecutionResult = consumeActionWithReset.execute(); assertThat(actionExecutionResult.status).isEqualTo(Success); body = assertActionOutputsSize(actionExecutionResult, 1); assertThat(body.get(0).get("payload")).isEqualTo("1"); // third time without reset - sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null,null, logger); - actionExecutionResult = sut.execute(); + consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false); + actionExecutionResult = consumeAction.execute(); assertThat(actionExecutionResult.status).isEqualTo(Failure); } + @NotNull + private KafkaBasicConsumeAction getKafkaBasicConsumeAction(Target target, Map props, boolean resetOffset) { + return new KafkaBasicConsumeAction(target, uniqueTopic, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, resetOffset, logger); + } + private List> assertActionOutputsSize(ActionExecutionResult actionExecutionResult, int size) { assertThat(actionExecutionResult.outputs).hasSize(3); @@ -189,8 +183,9 @@ private List> assertActionOutputsSize(ActionExecutionResult return body; } - private Producer createProducer() { - Map producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker)); + private static Producer createProducer(String brokerPath) { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerPath); return new DefaultKafkaProducerFactory<>(producerProps, new IntegerSerializer(), new StringSerializer()).createProducer(); } }