diff --git a/.github/workflows/build-main-branches.yml b/.github/workflows/build-main-branches.yml
index ee392d2f0b..d1a737bfea 100644
--- a/.github/workflows/build-main-branches.yml
+++ b/.github/workflows/build-main-branches.yml
@@ -18,6 +18,11 @@ jobs:
java: [
{'version': '8'},
{'version': '11'},
+ {
+ 'version': '11',
+ 'build_opts': '-Dkafka-container-version=latest-kafka-2.8.0 -pl smallrye-reactive-messaging-kafka --also-make',
+ 'name': 'Kafka tests against Kafka Broker 2.8.0'
+ },
{
'version': '16',
'opts': '--illegal-access=permit' # required for kotlin
diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml
index 4b1983f767..6bbb08f8ca 100644
--- a/.github/workflows/build-pull.yml
+++ b/.github/workflows/build-pull.yml
@@ -16,6 +16,11 @@ jobs:
java: [
{'version': '8'},
{'version': '11'},
+ {
+ 'version': '11',
+ 'build_opts': '-Dkafka-container-version=latest-kafka-2.8.0 -pl smallrye-reactive-messaging-kafka --also-make',
+ 'name': 'Kafka tests against Kafka Broker 2.8.0'
+ },
{
'version': '16',
'opts': '--illegal-access=permit' # required for kotlin
diff --git a/pom.xml b/pom.xml
index 86e4a6c514..4034f8de80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@
2.1.18.Final
- 2.8.0
+ 3.0.0
1.9.1
1.9.1-alpha
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java
index f8ec67f88d..4be037d49c 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java
@@ -219,7 +219,7 @@ public void testBroadcastWithPartitions() {
@Tag(TestTags.SLOW)
public void testRetry() {
// This test need an individual Kafka container
- try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION);) {
+ try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
MapBasedConfig config = newCommonConfigForSource()
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java
index e2580dbd0a..e38025aa25 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java
@@ -222,7 +222,7 @@ public void testBroadcastWithPartitions() {
@Tag(TestTags.SLOW)
public void testRetry() {
// This test need an individual Kafka container
- try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
+ try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
MapBasedConfig config = newCommonConfigForSource()
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java
index a9a67d9fcf..52071dfab3 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaBrokerExtension.java
@@ -23,7 +23,7 @@
public class KafkaBrokerExtension implements BeforeAllCallback, ParameterResolver, CloseableResource {
public static final Logger LOGGER = Logger.getLogger(KafkaBrokerExtension.class.getName());
- public static final String KAFKA_VERSION = "latest-kafka-2.8.0";
+ public static final String KAFKA_VERSION = "latest-kafka-3.0.0";
private static boolean started = false;
static StrimziKafkaContainer kafka;
@@ -44,8 +44,13 @@ public void close() {
stopKafkaBroker();
}
+ public static String getKafkaContainerVersion() {
+ String kafkaContainerVersion = System.getProperty("kafka-container-version");
+ return kafkaContainerVersion != null ? kafkaContainerVersion : KAFKA_VERSION;
+ }
+
public static void startKafkaBroker() {
- kafka = new StrimziKafkaContainer(KAFKA_VERSION)
+ kafka = new StrimziKafkaContainer(getKafkaContainerVersion())
.withExposedPorts(9092);
kafka.start();
LOGGER.info("Kafka broker started: " + kafka.getBootstrapServers() + " (" + kafka.getMappedPort(9092) + ")");
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java
index a75029ef67..15654443af 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaToxiproxyExtension.java
@@ -67,7 +67,7 @@ static int startKafkaProxy(Network network) {
}
static void startKafkaBroker(Network network, int proxyPort) {
- kafka = new ProxiedStrimziKafkaContainer(KAFKA_VERSION, proxyPort)
+ kafka = new ProxiedStrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion(), proxyPort)
.withNetwork(network)
.withNetworkAliases(KAFKA_NETWORK_ALIAS)
.withExposedPorts(KAFKA_PORT);
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java
index eb7d8e5b81..3d21c92dcc 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/KafkaUsage.java
@@ -116,7 +116,7 @@ private static void sleep(Duration duration) {
*/
public static class FixedKafkaContainer extends StrimziKafkaContainer {
public FixedKafkaContainer(int port) {
- super(KafkaBrokerExtension.KAFKA_VERSION);
+ super(KafkaBrokerExtension.getKafkaContainerVersion());
super.addFixedExposedPort(port, KAFKA_PORT);
}
diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java
index 64cf36ddbe..3c420ac528 100644
--- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java
+++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/BrokerRestartTest.java
@@ -46,7 +46,7 @@ public void tearDown() {
@Test
public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() throws Exception {
- try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
+ try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
@@ -71,7 +71,7 @@ public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() th
@Test
public void testResumingPausingWhileBrokerIsDown() throws Exception {
- try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
+ try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
Integer port = kafka.getMappedPort(KAFKA_PORT);
@@ -105,7 +105,7 @@ public void testResumingPausingWhileBrokerIsDown() throws Exception {
@Test
public void testPausingWhileBrokerIsDown() throws Exception {
- try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
+ try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
Integer port = kafka.getMappedPort(KAFKA_PORT);
@@ -163,7 +163,7 @@ public void testPausingWhileBrokerIsDown() throws Exception {
@Test
public void testWithBrokerRestart() throws Exception {
int sendBatchSize = 10;
- try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
+ try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
String groupId = UUID.randomUUID().toString();
MapBasedConfig config = createConsumerConfig(groupId)