Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 15, 2024
1 parent ccf4ce5 commit 2b41117
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
* @param <R> type of {@link Runner} used by this app
* @param <CR> type of {@link CleanUpRunner} used by this app
* @param <O> type of options to create runner
* @param <E> type of {@link ExecutableApp} used by this app
* @param <CA> type of {@link ConfiguredApp} used by this app
*/
@ToString
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public final Optional<ProducerExecutionOptions> createExecutionOptions() {
return Optional.empty();
}

/**
* Topics used by {@link ProducerApp}
* @return {@code ProducerTopicConfig}
*/
public final ProducerTopicConfig createTopicConfig() {
return ProducerTopicConfig.builder()
.outputTopic(this.getOutputTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ public final Optional<StreamsExecutionOptions> createExecutionOptions() {
return Optional.of(options);
}

/**
* Topics used by {@link StreamsApp}
* @return {@code StreamsTopicConfig}
*/
public final StreamsTopicConfig createTopicConfig() {
return StreamsTopicConfig.builder()
.inputTopics(this.inputTopics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ public static Function<String, Map<String, Object>> getKafkaPropertiesWithSchema
};
}

/**
* Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and
* {@link org.apache.kafka.common.serialization.Serializer} using the {@code TestTopology} properties.
* @param testTopology {@code TestTopology} to use properties of
* @return {@code Configurator}
* @see TestTopology#getProperties()
*/
public static Configurator createConfigurator(final TestTopology<?, ?> testTopology) {
return new Configurator(testTopology.getProperties());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@

@FunctionalInterface
interface Configurable<T> {
/**
* Configure this class
* @param config configs in key/value pairs
* @param isKey whether is for key or value
* @return configured instance
*/
T configure(Map<String, Object> config, boolean isKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
* Configure {@link Serde} and {@link Serializer} using base properties
*/
@RequiredArgsConstructor
public class Configurator {

Expand Down Expand Up @@ -124,6 +127,12 @@ public <T> Serializer<T> configureForKeys(final Serializer<T> serializer,
return this.configure(key(serializer, configOverrides));
}

/**
* Configure a {@code PreConfigured} object using {@link #kafkaProperties}
* @param preConfigured pre-configured {@link Serde} or {@link Serializer}
* @return configured instance
* @param <T> type of configured instance
*/
public <T> T configure(final PreConfigured<T> preConfigured) {
return preConfigured.configure(this.kafkaProperties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,4 @@ private EffectiveProducerAppConfiguration createEffectiveConfiguration() {
.kafkaProperties(this.kafkaProperties)
.build();
}

private void setup(final EffectiveProducerAppConfiguration configuration) {
this.app.setup(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,50 +25,107 @@
package com.bakdata.kafka;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;

import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import lombok.AccessLevel;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
* A pre-configured {@link Serde} or {@link Serializer}, i.e., configs and isKey are set.
* @param <T> type of underlying configurable
*/
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
public final class PreConfigured<T> {
private final @NonNull Configurable<T> configurable;
private final @NonNull Map<String, Object> configOverrides;
private final boolean isKey;

/**
* Pre-configure a {@code Serde} for values
* @param serde {@code Serde} to pre-configure
* @return pre-configured serde
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <T> PreConfigured<Serde<T>> value(final Serde<T> serde) {
return value(configurable(serde));
}

/**
* Pre-configure a {@code Serde} for values with config overrides
* @param serde {@code Serde} to pre-configure
* @param configOverrides configs passed to {@link Serde#configure(Map, boolean)}
* @return pre-configured serde
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <T> PreConfigured<Serde<T>> value(final Serde<T> serde, final Map<String, Object> configOverrides) {
return value(configurable(serde), configOverrides);
}

/**
* Pre-configure a {@code Serde} for keys
* @param serde {@code Serde} to pre-configure
* @return pre-configured serde
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <T> PreConfigured<Serde<T>> key(final Serde<T> serde) {
return key(configurable(serde));
}

/**
* Pre-configure a {@code Serde} for keys with config overrides
* @param serde {@code Serde} to pre-configure
* @param configOverrides configs passed to {@link Serde#configure(Map, boolean)}
* @return pre-configured serde
* @param <T> type (de-)serialized by the {@code Serde}
*/
public static <T> PreConfigured<Serde<T>> key(final Serde<T> serde, final Map<String, Object> configOverrides) {
return key(configurable(serde), configOverrides);
}

/**
* Pre-configure a {@code Serializer} for values
* @param serializer {@code Serializer} to pre-configure
* @return pre-configured serializer
* @param <T> type serialized by the {@code Serializer}
*/
public static <T> PreConfigured<Serializer<T>> value(final Serializer<T> serializer) {
return value(configurable(serializer));
}

/**
* Pre-configure a {@code Serializer} for values
* @param serializer {@code Serializer} to pre-configure
* @param configOverrides configs passed to {@link Serializer#configure(Map, boolean)}
* @return pre-configured serializer
* @param <T> type serialized by the {@code Serializer}
*/
public static <T> PreConfigured<Serializer<T>> value(final Serializer<T> serializer,
final Map<String, Object> configOverrides) {
return value(configurable(serializer), configOverrides);
}

/**
* Pre-configure a {@code Serializer} for keys
* @param serializer {@code Serializer} to pre-configure
* @return pre-configured serializer
* @param <T> type serialized by the {@code Serializer}
*/
public static <T> PreConfigured<Serializer<T>> key(final Serializer<T> serializer) {
return key(configurable(serializer));
}

/**
* Pre-configure a {@code Serializer} for keys
* @param serializer {@code Serializer} to pre-configure
* @param configOverrides configs passed to {@link Serializer#configure(Map, boolean)}
* @return pre-configured serializer
* @param <T> type serialized by the {@code Serializer}
*/
public static <T> PreConfigured<Serializer<T>> key(final Serializer<T> serializer,
final Map<String, Object> configOverrides) {
return key(configurable(serializer), configOverrides);
Expand Down Expand Up @@ -106,10 +163,9 @@ public T configure(final Map<String, Object> baseConfig) {
}

private Map<String, Object> mergeConfig(final Map<String, Object> baseConfig) {
return ImmutableMap.<String, Object>builder()
.putAll(baseConfig)
.putAll(this.configOverrides)
.build();
final Map<String, Object> config = new HashMap<>(baseConfig);
config.putAll(this.configOverrides);
return unmodifiableMap(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,19 @@ public <K, V> Producer<K, V> createProducer(final Serializer<K> keySerializer,
return new KafkaProducer<>(this.kafkaProperties, keySerializer, valueSerializer);
}

/**
* Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and
* {@link org.apache.kafka.common.serialization.Serializer} using {@link #kafkaProperties}.
* @return {@code Configurator}
*/
public Configurator createConfigurator() {
return new Configurator(this.kafkaProperties);
}

/**
* Create {@code EffectiveProducerAppConfiguration} used by this app
* @return {@code EffectiveProducerAppConfiguration}
*/
public EffectiveProducerAppConfiguration createEffectiveConfiguration() {
return EffectiveProducerAppConfiguration.builder()
.kafkaProperties(this.kafkaProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,19 @@ public <K, V> KStream<K, V> streamInputPattern(final String role) {
return this.streamsBuilder.stream(this.topics.getInputPattern(role));
}

/**
* Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and
* {@link org.apache.kafka.common.serialization.Serializer} using {@link #kafkaProperties}.
* @return {@code Configurator}
*/
public Configurator createConfigurator() {
return new Configurator(this.kafkaProperties);
}

/**
* Create {@code EffectiveStreamsAppConfiguration} used by this app
* @return {@code EffectiveStreamsAppConfiguration}
*/
public EffectiveStreamsAppConfiguration createEffectiveConfiguration() {
return EffectiveStreamsAppConfiguration.builder()
.kafkaProperties(this.kafkaProperties)
Expand Down
Loading

0 comments on commit 2b41117

Please sign in to comment.