Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 5, 2024
1 parent 042757e commit 990bd4c
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 210 deletions.
4 changes: 0 additions & 4 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.streams "productive" }}
- name: "{{ .Values.configurationEnvPrefix }}_PRODUCTIVE"
value: {{ .Values.streams.productive | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ streams:
extraOutputTopics: {}
# role: output
# errorTopic: error
# productive: true
deleteOutput: false

commandLine: {}
Expand Down
4 changes: 0 additions & 4 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.streams "productive" }}
- name: "{{ .Values.configurationEnvPrefix }}_PRODUCTIVE"
value: {{ .Values.streams.productive | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ streams:
extraOutputTopics: {}
# role: output
# errorTopic: error
# productive: true

commandLine: {}
# MY_CLI_PARAM: "foo-bar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
* <li>{@link #errorTopic}</li>
* <li>{@link #extraInputTopics}</li>
* <li>{@link #extraInputPatterns}</li>
* <li>{@link #productive}</li>
* <li>{@link #volatileGroupInstanceId}</li>
* </ul>
* To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling
Expand All @@ -77,10 +76,6 @@ public abstract class KafkaStreamsApplication extends KafkaApplication {
private Map<String, List<String>> extraInputTopics = new HashMap<>();
@CommandLine.Option(names = "--extra-input-patterns", split = ",", description = "Additional named input patterns")
private Map<String, Pattern> extraInputPatterns = new HashMap<>();
@CommandLine.Option(names = "--productive", arity = "0..1",
description = "Whether to use Kafka Streams configuration values, such as replication.factor=3, that are "
+ "more suitable for production environments")
private boolean productive;
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
Expand Down Expand Up @@ -212,11 +207,9 @@ private ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean clean
private StreamsAppConfiguration createConfiguration() {
final StreamsTopicConfig topics = this.createTopicConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
final StreamsConfigurationOptions streamsOptions = this.createStreamsOptions();
return StreamsAppConfiguration.builder()
.topics(topics)
.kafkaConfig(kafkaConfig)
.options(streamsOptions)
.build();
}

Expand All @@ -238,12 +231,6 @@ private ExecutableStreamsApp<StreamsApp> createExecutableApp(final boolean clean
return configuredStreamsApp.withEndpoint(endpointConfig);
}

private StreamsConfigurationOptions createStreamsOptions() {
return StreamsConfigurationOptions.builder()
.productive(this.productive)
.build();
}

@RequiredArgsConstructor
private static class RunningApp implements AutoCloseable {
private final @NonNull ExecutableStreamsApp<StreamsApp> app;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ private KafkaStreamsApplication createWordCountApplication() {
final KafkaStreamsApplication application = new SimpleKafkaStreamsApplication(WordCount::new);
application.setOutputTopic("word_output");
application.setBrokers(this.kafkaCluster.getBrokerList());
application.setProductive(false);
application.setKafkaConfig(Map.of(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}

kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all");

// compression
kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

return kafkaConfig;
}

Expand All @@ -68,20 +74,26 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
* {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}.
* If Schema Registry is configured, {@link SpecificAvroSerializer} is used, otherwise
* {@link StringSerializer} is used.
* </li>
* <li>
* Configs provided by {@link ProducerApp#createKafkaProperties()}
* </li>
* <li>
* Configs provided via environment variables (see
* {@link EnvironmentKafkaConfigParser#parseVariables(Map)})
* </li>
* <li>
* Configs provided by {@link ProducerAppConfiguration#getKafkaConfig()}
* </li>
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
* </li>
* Additionally, the following is configured:
* <pre>
* max.in.flight.requests.per.connection=1
* acks=all
* compression.type=gzip
* </pre>
* </li>
* <li>
* Configs provided by {@link ProducerApp#createKafkaProperties()}
* </li>
* <li>
* Configs provided via environment variables (see
* {@link EnvironmentKafkaConfigParser#parseVariables(Map)})
* </li>
* <li>
* Configs provided by {@link ProducerAppConfiguration#getKafkaConfig()}
* </li>
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
* </li>
* </ul>
*
* @return Kafka configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
Expand All @@ -56,6 +57,15 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
}

// exactly once and order
kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1);

kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

// compression
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");

return kafkaConfig;
}

Expand All @@ -69,31 +79,38 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
* {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}.
* If Schema Registry is configured, {@link SpecificAvroSerde} is used, otherwise {@link StringSerde} is
* used.
* </li>
* <li>
* Configs provided by {@link StreamsApp#createKafkaProperties(StreamsConfigurationOptions)}
* </li>
* <li>
* Configs provided via environment variables (see
* {@link EnvironmentKafkaConfigParser#parseVariables(Map)})
* </li>
* <li>
* Configs provided by {@link StreamsAppConfiguration#getKafkaConfig()}
* </li>
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
* </li>
* <li>
* {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using
* {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)}
* </li>
* Additionally, exactly-once, in-order, and compression are configured:
* <pre>
* processing.guarantee=exactly_once_v2
* producer.max.in.flight.requests.per.connection=1
* producer.acks=all
* producer.compression.type=gzip
* </pre>
* </li>
* <li>
* Configs provided by {@link StreamsApp#createKafkaProperties()}
* </li>
* <li>
* Configs provided via environment variables (see
* {@link EnvironmentKafkaConfigParser#parseVariables(Map)})
* </li>
* <li>
* Configs provided by {@link StreamsAppConfiguration#getKafkaConfig()}
* </li>
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
* </li>
* <li>
* {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using
* {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)}
* </li>
* </ul>
*
* @return Kafka configuration
*/
public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaConfig = createKafkaProperties(endpointConfig);
kafkaConfig.putAll(this.app.createKafkaProperties(this.configuration.getOptions()));
kafkaConfig.putAll(this.app.createKafkaProperties());
kafkaConfig.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv()));
kafkaConfig.putAll(this.configuration.getKafkaConfig());
kafkaConfig.putAll(endpointConfig.createKafkaProperties());
Expand Down
37 changes: 5 additions & 32 deletions streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

package com.bakdata.kafka;

import java.util.HashMap;
import static java.util.Collections.emptyMap;

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;

/**
* Application that defines how to produce messages to Kafka and necessary configurations
Expand All @@ -49,38 +49,11 @@ default void setup(final ProducerAppSetupConfiguration configuration) {
void run(ProducerBuilder builder);

/**
* <p>This method should give a default configuration to run your producer application with.</p>
* To add a custom configuration, add a similar method to your custom application class:
* <pre>{@code
* public Map<String, Object> createKafkaProperties() {
* # Try to always use the kafka properties from the super class as base Map
* Map<String, Object> kafkaConfig = ProducerApp.super.createKafkaProperties();
* kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
* kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
* return kafkaConfig;
* }
* }</pre>
*
* Default configuration configures exactly-once, in-order, and compression:
* <pre>
* max.in.flight.requests.per.connection=1
* acks=all
* compression.type=gzip
* </pre>
*
* @return Returns a default Kafka configuration
* This method should give a default configuration to run your producer application with.
* @return Returns a default Kafka configuration. Empty by default
*/
default Map<String, Object> createKafkaProperties() {
final Map<String, Object> kafkaConfig = new HashMap<>();

// exactly once and order
kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all");

// compression
kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

return kafkaConfig;
return emptyMap();
}

/**
Expand Down
55 changes: 6 additions & 49 deletions streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@

package com.bakdata.kafka;

import java.util.HashMap;
import static java.util.Collections.emptyMap;

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.streams.StreamsConfig;

/**
* Application that defines a Kafka Streams {@link org.apache.kafka.streams.Topology} and necessary configurations
*/
public interface StreamsApp extends AutoCloseable {
int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3;

/**
* Setup Kafka resources, such as topics, before running this app
Expand Down Expand Up @@ -61,52 +59,11 @@ default void setup(final StreamsAppSetupConfiguration configuration) {
String getUniqueAppId(StreamsTopicConfig topics);

/**
* <p>This method should give a default configuration to run your streaming application with.</p>
* To add a custom configuration, add a similar method to your custom application class:
* <pre>{@code
* protected Map<String, Object> createKafkaProperties(StreamsOptions options) {
* # Try to always use the kafka properties from the super class as base Map
* Map<String, Object> kafkaConfig = StreamsApp.super.createKafkaProperties(options);
* kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
* kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
* return kafkaConfig;
* }
* }</pre>
*
* Default configuration configures exactly-once, in-order, and compression:
* <pre>
* processing.guarantee=exactly_once_v2
* producer.max.in.flight.requests.per.connection=1
* producer.acks=all
* producer.compression.type=gzip
* </pre>
*
* If {@link StreamsConfigurationOptions#isProductive()} is set the following is configured additionally:
* <pre>
* replication.factor=3
* </pre>
*
* @param options options to dynamically configure
* @return Returns a default Kafka Streams configuration
* This method should give a default configuration to run your streaming application with.
* @return Returns a default Kafka Streams configuration. Empty by default
*/
default Map<String, Object> createKafkaProperties(final StreamsConfigurationOptions options) {
final Map<String, Object> kafkaConfig = new HashMap<>();

// exactly once and order
kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1);

// resilience
if (options.isProductive()) {
kafkaConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, DEFAULT_PRODUCTIVE_REPLICATION_FACTOR);
}

kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

// compression
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");

return kafkaConfig;
default Map<String, Object> createKafkaProperties() {
return emptyMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,4 @@ public class StreamsAppConfiguration {
@NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build();
@Builder.Default
@NonNull Map<String, ?> kafkaConfig = emptyMap();
@Builder.Default
@NonNull StreamsConfigurationOptions options = StreamsConfigurationOptions.builder().build();
}
Loading

0 comments on commit 990bd4c

Please sign in to comment.