Skip to content

Commit

Permalink
Merge pull request #1493 from ozangunalp/kafka-clients-3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Dec 1, 2021
2 parents acd0c16 + 746ae3e commit ed5081f
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build-main-branches.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ jobs:
java: [
{'version': '8'},
{'version': '11'},
{
'version': '11',
'build_opts': '-Dkafka-container-version=latest-kafka-2.8.0 -pl smallrye-reactive-messaging-kafka --also-make',
'name': 'Kafka tests against Kafka Broker 2.8.0'
},
{
'version': '16',
'opts': '--illegal-access=permit' # required for kotlin
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/build-pull.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ jobs:
java: [
{'version': '8'},
{'version': '11'},
{
'version': '11',
'build_opts': '-Dkafka-container-version=latest-kafka-2.8.0 -pl smallrye-reactive-messaging-kafka --also-make',
'name': 'Kafka tests against Kafka Broker 2.8.0'
},
{
'version': '16',
'opts': '--illegal-access=permit' # required for kotlin
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@

<jboss-log-manager.version>2.1.18.Final</jboss-log-manager.version>

<kafka.version>2.8.0</kafka.version>
<kafka.version>3.0.0</kafka.version>

<opentelemetry.version>1.9.1</opentelemetry.version>
<opentelemetry-semver.version>1.9.1-alpha</opentelemetry-semver.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void testBroadcastWithPartitions() {
@Tag(TestTags.SLOW)
public void testRetry() {
// This test need an individual Kafka container
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION);) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
MapBasedConfig config = newCommonConfigForSource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testBroadcastWithPartitions() {
@Tag(TestTags.SLOW)
public void testRetry() {
// This test need an individual Kafka container
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
MapBasedConfig config = newCommonConfigForSource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class KafkaBrokerExtension implements BeforeAllCallback, ParameterResolver, CloseableResource {
public static final Logger LOGGER = Logger.getLogger(KafkaBrokerExtension.class.getName());

public static final String KAFKA_VERSION = "latest-kafka-2.8.0";
public static final String KAFKA_VERSION = "latest-kafka-3.0.0";

private static boolean started = false;
static StrimziKafkaContainer kafka;
Expand All @@ -44,8 +44,13 @@ public void close() {
stopKafkaBroker();
}

public static String getKafkaContainerVersion() {
String kafkaContainerVersion = System.getProperty("kafka-container-version");
return kafkaContainerVersion != null ? kafkaContainerVersion : KAFKA_VERSION;
}

public static void startKafkaBroker() {
kafka = new StrimziKafkaContainer(KAFKA_VERSION)
kafka = new StrimziKafkaContainer(getKafkaContainerVersion())
.withExposedPorts(9092);
kafka.start();
LOGGER.info("Kafka broker started: " + kafka.getBootstrapServers() + " (" + kafka.getMappedPort(9092) + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static int startKafkaProxy(Network network) {
}

static void startKafkaBroker(Network network, int proxyPort) {
kafka = new ProxiedStrimziKafkaContainer(KAFKA_VERSION, proxyPort)
kafka = new ProxiedStrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion(), proxyPort)
.withNetwork(network)
.withNetworkAliases(KAFKA_NETWORK_ALIAS)
.withExposedPorts(KAFKA_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private static void sleep(Duration duration) {
*/
public static class FixedKafkaContainer extends StrimziKafkaContainer {
public FixedKafkaContainer(int port) {
super(KafkaBrokerExtension.KAFKA_VERSION);
super(KafkaBrokerExtension.getKafkaContainerVersion());
super.addFixedExposedPort(port, KAFKA_PORT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void tearDown() {

@Test
public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() throws Exception {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);

Expand All @@ -71,7 +71,7 @@ public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() th

@Test
public void testResumingPausingWhileBrokerIsDown() throws Exception {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
Integer port = kafka.getMappedPort(KAFKA_PORT);
Expand Down Expand Up @@ -105,7 +105,7 @@ public void testResumingPausingWhileBrokerIsDown() throws Exception {

@Test
public void testPausingWhileBrokerIsDown() throws Exception {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
Integer port = kafka.getMappedPort(KAFKA_PORT);
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testPausingWhileBrokerIsDown() throws Exception {
@Test
public void testWithBrokerRestart() throws Exception {
int sendBatchSize = 10;
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
String groupId = UUID.randomUUID().toString();
MapBasedConfig config = createConsumerConfig(groupId)
Expand Down

0 comments on commit ed5081f

Please sign in to comment.