Skip to content

Commit

Permalink
feat(server): extensible kafka basic consume action test
Browse files Browse the repository at this point in the history
  • Loading branch information
KarimGl committed Mar 20, 2024
1 parent 6198afe commit 376015d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2017-2023 Enedis
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.chutneytesting.action.kafka;

import org.junit.jupiter.api.AfterAll;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;

public class EmbeddedKafkaBasicConsumeActionIntegrationTest extends KafkaBasicConsumeActionIntegrationTest {

private static EmbeddedKafkaBroker embeddedKafkaBroker;
@Override
protected String initBroker() {
embeddedKafkaBroker = new EmbeddedKafkaZKBroker(1);
embeddedKafkaBroker.afterPropertiesSet();
return embeddedKafkaBroker.getBrokersAsString();
}

@AfterAll
public static void afterAll() {
producer.close();
embeddedKafkaBroker.destroy();
System.out.println("after all child");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static com.chutneytesting.action.spi.ActionExecutionResult.Status.Success;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.kafka.test.utils.KafkaTestUtils.getCurrentOffset;
import static org.springframework.util.MimeTypeUtils.TEXT_PLAIN_VALUE;

import com.chutneytesting.action.TestLogger;
Expand All @@ -39,69 +38,58 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;

public class KafkaBasicConsumeActionIntegrationTest {
public abstract class KafkaBasicConsumeActionIntegrationTest {

private static final String TOPIC = "topic";
private static final String GROUP = "mygroup";
private static String KEYSTORE_JKS;
private EmbeddedKafkaZKBroker embeddedKafkaBroker;
private final String GROUP = "mygroup";
private String uniqueTopic;

private Producer<Integer, String> producer;
private TestTarget.TestTargetBuilder targetBuilder;
protected static Producer<Integer, String> producer;
private final TestTarget.TestTargetBuilder targetBuilder;

private TestLogger logger;

@BeforeAll
static void beforeAll() throws URISyntaxException {
KEYSTORE_JKS = Paths.get(requireNonNull(HttpsServerStartActionTest.class.getResource("/security/server.jks")).toURI()).toAbsolutePath().toString();
public KafkaBasicConsumeActionIntegrationTest() {
String brokerPath = initBroker();
producer = createProducer(brokerPath);
targetBuilder = TestTarget.TestTargetBuilder.builder().withTargetId("kafka").withUrl("tcp://" + brokerPath);
}

protected abstract String initBroker();

@BeforeEach
public void before() {
logger = new TestLogger();
embeddedKafkaBroker = new EmbeddedKafkaZKBroker(1, false, TOPIC);
embeddedKafkaBroker.afterPropertiesSet();
producer = createProducer();
targetBuilder = TestTarget.TestTargetBuilder.builder().withTargetId("kafka").withUrl("tcp://" + embeddedKafkaBroker.getBrokersAsString());
}

@AfterEach
void tearDown() {
producer.close();
embeddedKafkaBroker.destroy();
uniqueTopic = UUID.randomUUID().toString();
}

@Test
public void should_consume_message_from_broker_without_truststore() {

// given
producer.send(new ProducerRecord<>(TOPIC, 123, "my-test-value"));
producer.send(new ProducerRecord<>(uniqueTopic, 123, "my-test-value"));

Map<String, String> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());

Action sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "10 s", null, null, logger);
Action consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false);


// when
ActionExecutionResult actionExecutionResult = sut.execute();
ActionExecutionResult actionExecutionResult = consumeAction.execute();

// then
assertThat(actionExecutionResult.status).isEqualTo(Success);
Expand All @@ -110,20 +98,21 @@ public void should_consume_message_from_broker_without_truststore() {
}

@Test
public void consumer_from_target_with_truststore_should_reject_ssl_connection_with_broker_without_truststore_configured() {
public void consumer_from_target_with_truststore_should_reject_ssl_connection_with_broker_without_truststore_configured() throws URISyntaxException {
// given
Target target = targetBuilder.withProperty("trustStore", KEYSTORE_JKS)
String keystore_jks = Paths.get(requireNonNull(HttpsServerStartActionTest.class.getResource("/security/server.jks")).toURI()).toAbsolutePath().toString();
Target target = targetBuilder.withProperty("trustStore", keystore_jks)
.withProperty("trustStorePassword", "server")
.withProperty("security.protocol", "SSL")
.build();

Map<String, String> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);

Action sut = new KafkaBasicConsumeAction(target, TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, null,logger);
Action consumeAction = getKafkaBasicConsumeAction(target, props, false);

// when
ActionExecutionResult actionExecutionResult = sut.execute();
ActionExecutionResult actionExecutionResult = consumeAction.execute();

// then
assertThat(actionExecutionResult.status).isEqualTo(Failure);
Expand All @@ -137,38 +126,42 @@ public void should_reset_offset_to_the_beginning() {
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());

Action sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null,null, logger);
Action consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false);

producer.send(new ProducerRecord<>(TOPIC, 123, "1"));
producer.send(new ProducerRecord<>(uniqueTopic, 123, "1"));


ActionExecutionResult actionExecutionResult = sut.execute();
ActionExecutionResult actionExecutionResult = consumeAction.execute();
assertThat(actionExecutionResult.status).isEqualTo(Success);
List<Map<String, Object>> body = assertActionOutputsSize(actionExecutionResult, 1);
assertThat(body.get(0).get("payload")).isEqualTo("1");

// second time
sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, null,logger);
actionExecutionResult = sut.execute();
consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false);
actionExecutionResult = consumeAction.execute();

assertThat(actionExecutionResult.status).isEqualTo(Failure);

// third time with reset
sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, true,logger);
actionExecutionResult = sut.execute();
Action consumeActionWithReset = getKafkaBasicConsumeAction(targetBuilder.build(), props, true);
actionExecutionResult = consumeActionWithReset.execute();

assertThat(actionExecutionResult.status).isEqualTo(Success);
body = assertActionOutputsSize(actionExecutionResult, 1);
assertThat(body.get(0).get("payload")).isEqualTo("1");

// third time without reset
sut = new KafkaBasicConsumeAction(targetBuilder.build(), TOPIC, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null,null, logger);
actionExecutionResult = sut.execute();
consumeAction = getKafkaBasicConsumeAction(targetBuilder.build(), props, false);
actionExecutionResult = consumeAction.execute();

assertThat(actionExecutionResult.status).isEqualTo(Failure);

}

private KafkaBasicConsumeAction getKafkaBasicConsumeAction(Target target, Map<String, String> props, boolean resetOffset) {
return new KafkaBasicConsumeAction(target, uniqueTopic, GROUP, props, 1, null, null, TEXT_PLAIN_VALUE, "3000 ms", null, resetOffset, logger);
}

private List<Map<String, Object>> assertActionOutputsSize(ActionExecutionResult actionExecutionResult, int size) {
assertThat(actionExecutionResult.outputs).hasSize(3);

Expand All @@ -189,8 +182,9 @@ private List<Map<String, Object>> assertActionOutputsSize(ActionExecutionResult
return body;
}

private Producer<Integer, String> createProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
private static Producer<Integer, String> createProducer(String brokerPath) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerPath);
return new DefaultKafkaProducerFactory<>(producerProps, new IntegerSerializer(), new StringSerializer()).createProducer();
}
}

0 comments on commit 376015d

Please sign in to comment.