From 00f9ff3d71b6fd0cdeaa687db71fe89ef4927d7e Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 16 Nov 2021 15:10:14 +0300 Subject: [PATCH] Kafka container version is overridable from system property --- .../reactive/messaging/kafka/KafkaSourceTest.java | 2 +- .../kafka/KafkaSourceWithLegacyMetadataTest.java | 2 +- .../messaging/kafka/base/KafkaBrokerExtension.java | 7 ++++++- .../messaging/kafka/base/KafkaToxiproxyExtension.java | 2 +- .../reactive/messaging/kafka/base/KafkaUsage.java | 2 +- .../messaging/kafka/client/BrokerRestartTest.java | 8 ++++---- 6 files changed, 14 insertions(+), 9 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java index f8ec67f88d..4be037d49c 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java @@ -219,7 +219,7 @@ public void testBroadcastWithPartitions() { @Tag(TestTags.SLOW) public void testRetry() { // This test need an individual Kafka container - try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION);) { + try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) { kafka.start(); await().until(kafka::isRunning); MapBasedConfig config = newCommonConfigForSource() diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java index e2580dbd0a..e38025aa25 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java @@ -222,7 +222,7 @@ public void testBroadcastWithPartitions() { @Tag(TestTags.SLOW) public void testRetry() { // This test need an individual Kafka container - try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) { + try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) { kafka.start(); await().until(kafka::isRunning); MapBasedConfig config = newCommonConfigForSource() diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java index fd74ecb91b..52071dfab3 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java @@ -44,8 +44,13 @@ public void close() { stopKafkaBroker(); } + public static String getKafkaContainerVersion() { + String kafkaContainerVersion = System.getProperty("kafka-container-version"); + return kafkaContainerVersion != null ? kafkaContainerVersion : KAFKA_VERSION; + } + public static void startKafkaBroker() { - kafka = new StrimziKafkaContainer(KAFKA_VERSION) + kafka = new StrimziKafkaContainer(getKafkaContainerVersion()) .withExposedPorts(9092); kafka.start(); LOGGER.info("Kafka broker started: " + kafka.getBootstrapServers() + " (" + kafka.getMappedPort(9092) + ")"); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java index a75029ef67..15654443af 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java @@ -67,7 +67,7 @@ static int startKafkaProxy(Network network) { } static void startKafkaBroker(Network network, int proxyPort) { - kafka = new ProxiedStrimziKafkaContainer(KAFKA_VERSION, proxyPort) + kafka = new ProxiedStrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion(), proxyPort) .withNetwork(network) .withNetworkAliases(KAFKA_NETWORK_ALIAS) .withExposedPorts(KAFKA_PORT); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java index eb7d8e5b81..3d21c92dcc 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java @@ -116,7 +116,7 @@ private static void sleep(Duration duration) { */ public static class FixedKafkaContainer extends StrimziKafkaContainer { public FixedKafkaContainer(int port) { - super(KafkaBrokerExtension.KAFKA_VERSION); + super(KafkaBrokerExtension.getKafkaContainerVersion()); super.addFixedExposedPort(port, KAFKA_PORT); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java index 64cf36ddbe..3c420ac528 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java @@ -46,7 +46,7 @@ public void tearDown() { @Test public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() throws Exception { - try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) { + try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) { kafka.start(); await().until(kafka::isRunning); @@ -71,7 +71,7 @@ public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() th @Test public void testResumingPausingWhileBrokerIsDown() throws Exception { - try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) { + try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) { kafka.start(); await().until(kafka::isRunning); Integer port = kafka.getMappedPort(KAFKA_PORT); @@ -105,7 +105,7 @@ public void testResumingPausingWhileBrokerIsDown() throws Exception { @Test public void testPausingWhileBrokerIsDown() throws Exception { - try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) { + try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) { kafka.start(); await().until(kafka::isRunning); Integer port = kafka.getMappedPort(KAFKA_PORT); @@ -163,7 +163,7 @@ public void testPausingWhileBrokerIsDown() throws Exception { @Test public void testWithBrokerRestart() throws Exception { int sendBatchSize = 10; - try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) { + try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) { kafka.start(); String groupId = UUID.randomUUID().toString(); MapBasedConfig config = createConsumerConfig(groupId)