Skip to content

Commit

Permalink
Kafka container version is overridable from system property
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 30, 2021
1 parent 5d95ccd commit 00f9ff3
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void testBroadcastWithPartitions() {
@Tag(TestTags.SLOW)
public void testRetry() {
// This test need an individual Kafka container
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION);) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
MapBasedConfig config = newCommonConfigForSource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testBroadcastWithPartitions() {
@Tag(TestTags.SLOW)
public void testRetry() {
// This test need an individual Kafka container
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
MapBasedConfig config = newCommonConfigForSource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ public void close() {
stopKafkaBroker();
}

public static String getKafkaContainerVersion() {
String kafkaContainerVersion = System.getProperty("kafka-container-version");
return kafkaContainerVersion != null ? kafkaContainerVersion : KAFKA_VERSION;
}

public static void startKafkaBroker() {
kafka = new StrimziKafkaContainer(KAFKA_VERSION)
kafka = new StrimziKafkaContainer(getKafkaContainerVersion())
.withExposedPorts(9092);
kafka.start();
LOGGER.info("Kafka broker started: " + kafka.getBootstrapServers() + " (" + kafka.getMappedPort(9092) + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static int startKafkaProxy(Network network) {
}

static void startKafkaBroker(Network network, int proxyPort) {
kafka = new ProxiedStrimziKafkaContainer(KAFKA_VERSION, proxyPort)
kafka = new ProxiedStrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion(), proxyPort)
.withNetwork(network)
.withNetworkAliases(KAFKA_NETWORK_ALIAS)
.withExposedPorts(KAFKA_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private static void sleep(Duration duration) {
*/
public static class FixedKafkaContainer extends StrimziKafkaContainer {
public FixedKafkaContainer(int port) {
super(KafkaBrokerExtension.KAFKA_VERSION);
super(KafkaBrokerExtension.getKafkaContainerVersion());
super.addFixedExposedPort(port, KAFKA_PORT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void tearDown() {

@Test
public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() throws Exception {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);

Expand All @@ -71,7 +71,7 @@ public void testAcknowledgementUsingThrottledStrategyEvenAfterBrokerRestart() th

@Test
public void testResumingPausingWhileBrokerIsDown() throws Exception {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
Integer port = kafka.getMappedPort(KAFKA_PORT);
Expand Down Expand Up @@ -105,7 +105,7 @@ public void testResumingPausingWhileBrokerIsDown() throws Exception {

@Test
public void testPausingWhileBrokerIsDown() throws Exception {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
await().until(kafka::isRunning);
Integer port = kafka.getMappedPort(KAFKA_PORT);
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testPausingWhileBrokerIsDown() throws Exception {
@Test
public void testWithBrokerRestart() throws Exception {
int sendBatchSize = 10;
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.KAFKA_VERSION)) {
try (StrimziKafkaContainer kafka = new StrimziKafkaContainer(KafkaBrokerExtension.getKafkaContainerVersion())) {
kafka.start();
String groupId = UUID.randomUUID().toString();
MapBasedConfig config = createConsumerConfig(groupId)
Expand Down

0 comments on commit 00f9ff3

Please sign in to comment.