Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default serialization config to apps #239

Merged
merged 15 commits into from
Jul 25, 2024
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil

```java
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsBootstrapApplication extends KafkaStreamsApplication {
public class MyStreamsApplication extends KafkaStreamsApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyStreamsApplication(), args);
}

@Override
Expand All @@ -86,6 +88,11 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication {
return "streams-bootstrap-app-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand Down Expand Up @@ -142,12 +149,14 @@ import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class StreamsBootstrapApplication extends KafkaProducerApplication {
public class MyProducerApplication extends KafkaProducerApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyProducerApplication(), args);
}

@Override
Expand All @@ -162,6 +171,11 @@ public class StreamsBootstrapApplication extends KafkaProducerApplication {
};
}

@Override
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, StringSerializer.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand Down
41 changes: 41 additions & 0 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;

Expand All @@ -63,6 +64,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -91,6 +97,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
}), new String[]{
"--brokers", "localhost:9092",
"--schema-registry-url", "http://localhost:8081",
Expand All @@ -115,6 +126,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -147,6 +163,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -179,6 +200,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "app";
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
Expand Down Expand Up @@ -210,6 +236,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "app";
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
Expand Down Expand Up @@ -249,6 +280,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}
}, new String[]{
Expand All @@ -275,6 +311,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
Expand Down Expand Up @@ -57,6 +58,11 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
return CloseFlagApp.this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

@Override
public void close() {
CloseFlagApp.this.appClosed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -85,10 +86,8 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
}

@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class
);
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class);
}
})) {
app.setBrokers(this.kafkaCluster.getBrokerList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@

package com.bakdata.kafka.test_applications;

import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
Expand All @@ -43,4 +45,9 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
return this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

package com.bakdata.kafka.test_applications;

import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Arrays;
import java.util.regex.Pattern;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
Expand All @@ -56,4 +58,9 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
}
3 changes: 2 additions & 1 deletion streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies {
api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion)
val confluentVersion: String by project
implementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion)
api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion)
api(
group = "org.slf4j",
Expand Down Expand Up @@ -42,6 +42,7 @@ dependencies {
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
* @param <T> type of topic config
* @param <C> type of clean up config
*/
@FunctionalInterface
public interface App<T, C> extends AutoCloseable {

/**
Expand Down Expand Up @@ -63,4 +62,10 @@ default Map<String, Object> createKafkaProperties() {
default void setup(final EffectiveAppConfiguration<T> configuration) {
// do nothing by default
}

/**
* Configure default serialization behavior
* @return {@code SerializationConfig}
*/
SerializationConfig defaultSerializationConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@

import static java.util.Collections.emptyMap;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

/**
* A {@link ProducerApp} with a corresponding {@link AppConfiguration}
Expand All @@ -45,17 +43,9 @@ public class ConfiguredProducerApp<T extends ProducerApp> implements ConfiguredA
private final @NonNull T app;
private final @NonNull AppConfiguration<ProducerTopicConfig> configuration;

private static Map<String, Object> createBaseConfig(final KafkaEndpointConfig endpointConfig) {
private static Map<String, Object> createBaseConfig() {
final Map<String, Object> kafkaConfig = new HashMap<>();

if (endpointConfig.isSchemaRegistryConfigured()) {
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
} else {
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}

kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all");

Expand All @@ -70,12 +60,6 @@ private static Map<String, Object> createBaseConfig(final KafkaEndpointConfig en
* Configuration is created in the following order
* <ul>
* <li>
* {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and
* {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} are configured based on
* {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}.
* If Schema Registry is configured, {@link SpecificAvroSerializer} is used, otherwise
* {@link StringSerializer} is used.
* Additionally, the following is configured:
* <pre>
* max.in.flight.requests.per.connection=1
* acks=all
Expand All @@ -95,6 +79,11 @@ private static Map<String, Object> createBaseConfig(final KafkaEndpointConfig en
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
* </li>
* <li>
* {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and
* {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} is configured using
* {@link ProducerApp#defaultSerializationConfig()}
* </li>
* </ul>
*
* @param endpointConfig endpoint to run app on
Expand Down Expand Up @@ -130,7 +119,7 @@ public void close() {
}

private KafkaPropertiesFactory createPropertiesFactory(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> baseConfig = createBaseConfig(endpointConfig);
final Map<String, Object> baseConfig = createBaseConfig();
return KafkaPropertiesFactory.builder()
.baseConfig(baseConfig)
.app(this.app)
Expand Down
Loading
Loading