Skip to content

Commit

Permalink
Use MockSchemaRegistry and remove custom mock
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 8, 2025
1 parent cde20aa commit c22c657
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.fluent_kafka_streams_tests.junit4;

import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -105,8 +104,8 @@ protected TestTopologyRule(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory,
final Serde<DefaultK> defaultKeySerde, final Serde<DefaultV> defaultValueSerde,
final SchemaRegistryMock schemaRegistryMock) {
super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock);
final String schemaRegistryUrl) {
super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryUrl);
}

@Override
Expand All @@ -125,13 +124,8 @@ public void evaluate() throws Throwable {
}

@Override
protected <K, V> TestTopologyRule<K, V> with(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
final Serde<V> defaultValueSerde,
final SchemaRegistryMock schemaRegistryMock) {
return new TestTopologyRule<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,
schemaRegistryMock);
public TestTopologyRule<DefaultK, DefaultV> withSchemaRegistryUrl(final String schemaRegistryUrl) {
return (TestTopologyRule<DefaultK, DefaultV>) super.withSchemaRegistryUrl(schemaRegistryUrl);
}

@Override
Expand All @@ -151,8 +145,13 @@ public <K, V> TestTopologyRule<K, V> withDefaultSerde(final Serde<K> defaultKeyS
}

@Override
public TestTopologyRule<DefaultK, DefaultV> withSchemaRegistryUrl(final SchemaRegistryMock schemaRegistryMock) {
return (TestTopologyRule<DefaultK, DefaultV>) super.withSchemaRegistryUrl(schemaRegistryMock);
protected <K, V> TestTopologyRule<K, V> with(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
final Serde<V> defaultValueSerde,
final String schemaRegistryUrl) {
return new TestTopologyRule<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,
schemaRegistryUrl);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
package com.bakdata.fluent_kafka_streams_tests.junit4;

import com.bakdata.fluent_kafka_streams_tests.junit4.test_applications.WordCount;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -39,7 +36,7 @@ public class WordCountWitherTest {
public final TestTopologyRule<Object, String> testTopology =
new TestTopologyRule<>(this.app.getTopology(), WordCount.getKafkaProperties())
.withDefaultValueSerde(Serdes.String())
.withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));
.withSchemaRegistryUrl("mock://my-scope");

@Test
public void shouldAggregateSameWordStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.fluent_kafka_streams_tests.junit5;

import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -106,8 +105,8 @@ protected TestTopologyExtension(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory,
final Serde<DefaultK> defaultKeySerde, final Serde<DefaultV> defaultValueSerde,
final SchemaRegistryMock schemaRegistryMock) {
super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock);
final String schemaRegistryUrl) {
super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryUrl);
}

@Override
Expand All @@ -121,13 +120,9 @@ public void beforeEach(final ExtensionContext context) {
}

@Override
protected <K, V> TestTopology<K, V> with(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
final Serde<V> defaultValueSerde,
final SchemaRegistryMock schemaRegistry) {
return new TestTopologyExtension<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,
schemaRegistry);
public TestTopologyExtension<DefaultK, DefaultV> withSchemaRegistryUrl(
final String schemaRegistryUrl) {
return (TestTopologyExtension<DefaultK, DefaultV>) super.withSchemaRegistryUrl(schemaRegistryUrl);
}

@Override
Expand All @@ -147,8 +142,12 @@ public <K, V> TestTopologyExtension<K, V> withDefaultSerde(final Serde<K> defaul
}

@Override
public TestTopologyExtension<DefaultK, DefaultV> withSchemaRegistryUrl(
final SchemaRegistryMock schemaRegistryMock) {
return (TestTopologyExtension<DefaultK, DefaultV>) super.withSchemaRegistryUrl(schemaRegistryMock);
protected <K, V> TestTopology<K, V> with(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
final Serde<V> defaultValueSerde,
final String schemaRegistryUrl) {
return new TestTopologyExtension<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,
schemaRegistryUrl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
package com.bakdata.fluent_kafka_streams_tests.junit5;

import com.bakdata.fluent_kafka_streams_tests.junit5.test_applications.WordCount;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -39,7 +36,7 @@ class WordCountWitherTest {
final TestTopologyExtension<Object, String> testTopology =
new TestTopologyExtension<>(this.app::getTopology, WordCount.getKafkaProperties())
.withDefaultValueSerde(Serdes.String())
.withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));
.withSchemaRegistryUrl("mock://my-scope");

@Test
void shouldAggregateSameWordStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
*/
@Getter
public class TestTopology<DefaultK, DefaultV> {
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "mock://";
private final String schemaRegistryUrl;
private final Function<? super Map<String, Object>, ? extends Topology> topologyFactory;
private final Map<String, Object> properties = new HashMap<>();
Expand Down Expand Up @@ -146,7 +147,7 @@ protected TestTopology(final Function<? super Map<String, Object>, ? extends Top
*/
public TestTopology(final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
this(topologyFactory, propertiesFactory, null, null, "mock://");
this(topologyFactory, propertiesFactory, null, null, DEFAULT_SCHEMA_REGISTRY_URL);
}

/**
Expand Down

0 comments on commit c22c657

Please sign in to comment.