diff --git a/streams-bootstrap-picocli/build.gradle.kts b/streams-bootstrap-picocli/build.gradle.kts index d8ba02d4..d18c6b8e 100644 --- a/streams-bootstrap-picocli/build.gradle.kts +++ b/streams-bootstrap-picocli/build.gradle.kts @@ -25,6 +25,10 @@ dependencies { exclude(group = "org.slf4j", module = "slf4j-log4j12") } testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") - val confluentVersion: String by project - testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) + val fluentKafkaVersion: String by project + testImplementation( + group = "com.bakdata.fluent-kafka-streams-tests", + name = "schema-registry-mock-junit5", + version = fluentKafkaVersion + ) } diff --git a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index ee3d59a7..57b51919 100644 --- a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -34,6 +34,7 @@ import com.bakdata.kafka.ProducerRunnable; import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.bakdata.kafka.TestRecord; +import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import java.util.Map; @@ -50,9 +51,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; class RunProducerAppTest { private static final int TIMEOUT_SECONDS = 10; + @RegisterExtension + final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); @BeforeEach @@ -88,7 +92,7 @@ public Map createKafkaProperties() { } })) { app.setBrokers(this.kafkaCluster.getBrokerList()); - app.setSchemaRegistryUrl("mock://"); + app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); app.setOutputTopic(output); app.setKafkaConfig(Map.of( ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" @@ -98,7 +102,8 @@ public Map createKafkaProperties() { assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class) .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://") + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl()) .build())) .hasSize(1) .anySatisfy(kv -> {