Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 9, 2024
1 parent 8ce91c6 commit 9241317
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
8 changes: 6 additions & 2 deletions streams-bootstrap-picocli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ dependencies {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2")
val confluentVersion: String by project
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val fluentKafkaVersion: String by project
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import java.util.Map;
Expand All @@ -50,9 +51,12 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class RunProducerAppTest {
private static final int TIMEOUT_SECONDS = 10;
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();

@BeforeEach
Expand Down Expand Up @@ -88,7 +92,7 @@ public Map<String, Object> createKafkaProperties() {
}
})) {
app.setBrokers(this.kafkaCluster.getBrokerList());
app.setSchemaRegistryUrl("mock://");
app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl());
app.setOutputTopic(output);
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
Expand All @@ -98,7 +102,8 @@ public Map<String, Object> createKafkaProperties() {
assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://")
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.build()))
.hasSize(1)
.anySatisfy(kv -> {
Expand Down

0 comments on commit 9241317

Please sign in to comment.