Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 3, 2024
1 parent 497bbe7 commit fa9a1f1
Show file tree
Hide file tree
Showing 19 changed files with 239 additions and 159 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
version=2.17.1-SNAPSHOT
org.gradle.caching=true
# running Kafka JUnit in parallel causes problems
org.gradle.parallel=false
kafkaVersion=3.6.1
kafkaJunitVersion=3.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import com.bakdata.kafka.ConfiguredProducerApp.ExecutableProducerApp;
import java.util.Map;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -60,10 +59,10 @@ public void clean() {
cleanUpRunner.clean();
}

public ConfiguredProducerApp createConfiguredApp() {
public ConfiguredProducerApp<ProducerApp> createConfiguredApp() {
final ProducerApp producerApp = this.createApp();
final ProducerAppConfiguration configuration = this.createConfiguration();
return new ConfiguredProducerApp(producerApp, configuration);
return new ConfiguredProducerApp<ProducerApp>(producerApp, configuration);
}

public ProducerAppConfiguration createConfiguration() {
Expand All @@ -85,17 +84,17 @@ public ProducerTopicConfig createTopicConfig() {
public abstract ProducerApp createApp();

public ProducerRunner createRunner() {
final ExecutableProducerApp executableApp = this.createExecutableApp();
final ExecutableProducerApp<ProducerApp> executableApp = this.createExecutableApp();
return executableApp.createRunner();
}

public ProducerCleanUpRunner createCleanUpRunner() {
final ExecutableProducerApp executableApp = this.createExecutableApp();
final ExecutableProducerApp<ProducerApp> executableApp = this.createExecutableApp();
return executableApp.createCleanUpRunner();
}

private ExecutableProducerApp createExecutableApp() {
final ConfiguredProducerApp app = this.createConfiguredApp();
private ExecutableProducerApp<ProducerApp> createExecutableApp() {
final ConfiguredProducerApp<ProducerApp> app = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return app.withEndpoint(endpointConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import com.bakdata.kafka.ConfiguredStreamsApp.ExecutableStreamsApp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -74,6 +73,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
@ToString.Exclude
// ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<StreamsRunner> runners = new ConcurrentLinkedDeque<>();

/**
Expand Down Expand Up @@ -108,7 +109,7 @@ public void reset() {
runner.reset();
}

public abstract StreamsApp createApp();
public abstract StreamsApp createApp(boolean cleanUp);

public StreamsExecutionOptions createExecutionOptions() {
return StreamsExecutionOptions.builder()
Expand All @@ -117,20 +118,20 @@ public StreamsExecutionOptions createExecutionOptions() {
}

public StreamsRunner createRunner() {
final ExecutableStreamsApp executableStreamsApp = this.createExecutableApp();
final ExecutableStreamsApp<StreamsApp> executableStreamsApp = this.createExecutableApp(false);
final StreamsExecutionOptions executionOptions = this.createExecutionOptions();
return executableStreamsApp.createRunner(executionOptions);
}

public StreamsCleanUpRunner createCleanUpRunner() {
final ExecutableStreamsApp executableApp = this.createExecutableApp();
final ExecutableStreamsApp<StreamsApp> executableApp = this.createExecutableApp(true);
return executableApp.createCleanUpRunner();
}

public ConfiguredStreamsApp createConfiguredApp() {
final StreamsApp streamsApp = this.createApp();
public ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
final StreamsApp streamsApp = this.createApp(cleanUp);
final StreamsAppConfiguration streamsAppConfiguration = this.createConfiguration();
return new ConfiguredStreamsApp(streamsApp, streamsAppConfiguration);
return new ConfiguredStreamsApp<StreamsApp>(streamsApp, streamsAppConfiguration);
}

public StreamsAppConfiguration createConfiguration() {
Expand All @@ -156,8 +157,8 @@ public StreamsTopicConfig createTopicConfig() {
.build();
}

public ExecutableStreamsApp createExecutableApp() {
final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp();
public ExecutableStreamsApp<StreamsApp> createExecutableApp(final boolean cleanUp) {
final ConfiguredStreamsApp<StreamsApp> configuredStreamsApp = this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.withEndpoint(endpointConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SimpleKafkaStreamsApplication extends KafkaStreamsApplication {
private final @NonNull Supplier<StreamsApp> appFactory;

@Override
public StreamsApp createApp() {
public StreamsApp createApp(final boolean cleanUp) {
return this.appFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private static void runApp(final KafkaStreamsApplication app, final String... ar
void shouldExitWithSuccessCode() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp() {
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -105,7 +105,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
void shouldExitWithErrorCodeOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp() {
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -137,7 +137,7 @@ public void clean() {
void shouldExitWithErrorCodeOnMissingBrokerParameter() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp() {
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -239,7 +239,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
void shouldExitWithErrorOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp() {
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand All @@ -265,7 +265,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
void shouldParseArguments() {
final KafkaStreamsApplication app = new KafkaStreamsApplication() {
@Override
public StreamsApp createApp() {
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class StreamsBootstrapTopologyFactory {
* @see ConfiguredStreamsApp#createTopology(Map)
*/
public static <K, V> TestTopology<K, V> createTopologyWithSchemaRegistry(
final ConfiguredStreamsApp app) {
final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopology<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app));
}

Expand All @@ -66,7 +66,7 @@ public static <K, V> TestTopology<K, V> createTopologyWithSchemaRegistry(
* @see ConfiguredStreamsApp#createTopology(Map)
*/
public static <K, V> TestTopologyExtension<K, V> createTopologyExtensionWithSchemaRegistry(
final ConfiguredStreamsApp app) {
final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopologyExtension<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app));
}

Expand All @@ -81,7 +81,7 @@ public static <K, V> TestTopologyExtension<K, V> createTopologyExtensionWithSche
* @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig)
* @see ConfiguredStreamsApp#createTopology(Map)
*/
public static <K, V> TestTopology<K, V> createTopology(final ConfiguredStreamsApp app) {
public static <K, V> TestTopology<K, V> createTopology(final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopology<>(app::createTopology, getKafkaProperties(app));
}

Expand All @@ -98,7 +98,7 @@ public static <K, V> TestTopology<K, V> createTopology(final ConfiguredStreamsAp
* @see ConfiguredStreamsApp#createTopology(Map)
*/
public static <K, V> TestTopologyExtension<K, V> createTopologyExtension(
final ConfiguredStreamsApp app) {
final ConfiguredStreamsApp<? extends StreamsApp> app) {
return new TestTopologyExtension<>(app::createTopology, getKafkaProperties(app));
}

Expand All @@ -111,7 +111,7 @@ public static <K, V> TestTopologyExtension<K, V> createTopologyExtension(
* @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig)
*/
public static Function<String, Map<String, Object>> getKafkaPropertiesWithSchemaRegistryUrl(
final ConfiguredStreamsApp app) {
final ConfiguredStreamsApp<? extends StreamsApp> app) {
return schemaRegistryUrl -> {
final KafkaEndpointConfig endpointConfig = newEndpointConfig()
.schemaRegistryUrl(schemaRegistryUrl)
Expand All @@ -120,7 +120,7 @@ public static Function<String, Map<String, Object>> getKafkaPropertiesWithSchema
};
}

private static Map<String, Object> getKafkaProperties(final ConfiguredStreamsApp app) {
private static Map<String, Object> getKafkaProperties(final ConfiguredStreamsApp<? extends StreamsApp> app) {
final KafkaEndpointConfig endpointConfig = createEndpointConfig();
return app.getKafkaProperties(endpointConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

@RequiredArgsConstructor
public class ConfiguredProducerApp {
private final @NonNull ProducerApp app;
public class ConfiguredProducerApp<T extends ProducerApp> {
@Getter
private final @NonNull T app;
private final @NonNull ProducerAppConfiguration configuration;

private static Map<String, Object> createKafkaProperties(final KafkaEndpointConfig endpointConfig) {
Expand Down Expand Up @@ -87,31 +89,12 @@ public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpoint
return kafkaConfig;
}

public ExecutableProducerApp withEndpoint(final KafkaEndpointConfig endpointConfig) {
return new ExecutableProducerApp(endpointConfig);
public ExecutableProducerApp<T> withEndpoint(final KafkaEndpointConfig endpointConfig) {
return new ExecutableProducerApp<>(this.getTopics(), this.getKafkaProperties(endpointConfig), this.app);
}

@RequiredArgsConstructor
public class ExecutableProducerApp {
private final @NonNull KafkaEndpointConfig endpointConfig;

public ProducerCleanUpRunner createCleanUpRunner() {
final Map<String, Object> kafkaProperties =
ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig);
final ProducerCleanUpConfigurer configurer = new ProducerCleanUpConfigurer();
ConfiguredProducerApp.this.app.setupCleanUp(configurer);
return ProducerCleanUpRunner.create(ConfiguredProducerApp.this.configuration.getTopics(), kafkaProperties,
configurer);
public ProducerTopicConfig getTopics() {
return this.configuration.getTopics();
}

public ProducerRunner createRunner() {
final Map<String, Object> kafkaProperties =
ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig);
final ProducerBuilder producerBuilder = ProducerBuilder.builder()
.topics(ConfiguredProducerApp.this.configuration.getTopics())
.kafkaProperties(kafkaProperties)
.build();
return new ProducerRunner(() -> ConfiguredProducerApp.this.app.run(producerBuilder));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

@RequiredArgsConstructor
public class ConfiguredStreamsApp {
private final @NonNull StreamsApp app;
public class ConfiguredStreamsApp<T extends StreamsApp> {
@Getter
private final @NonNull T app;
private final @NonNull StreamsAppConfiguration configuration;

private static Map<String, Object> createKafkaProperties(final KafkaEndpointConfig endpointConfig) {
Expand Down Expand Up @@ -89,75 +91,33 @@ public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpoint
kafkaConfig.putAll(this.configuration.getKafkaConfig());
kafkaConfig.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv()));
kafkaConfig.putAll(endpointConfig.createKafkaProperties());
kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, this.app.getUniqueAppId(this.configuration.getTopics()));
kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, this.app.getUniqueAppId(this.getTopics()));
return kafkaConfig;
}

public Topology createTopology(final Map<String, Object> kafkaProperties) {
return this.createTopology(kafkaProperties, false);
public StreamsTopicConfig getTopics() {
return this.configuration.getTopics();
}

public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) {
return new ExecutableStreamsApp(endpointConfig);
public ExecutableStreamsApp<T> withEndpoint(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final Topology topology = this.createTopology(kafkaProperties);
return new ExecutableStreamsApp<T>(topology, new StreamsConfig(kafkaProperties), this.app);
}

/**
* Create the topology of the Kafka Streams app
*
* @return topology of the Kafka Streams app
* @param kafkaProperties configuration that should be used by clients to configure Kafka utilities
* @param cleanUp whether topology is created in cleanUp context
* @return topology of the Kafka Streams app
*/
private Topology createTopology(final Map<String, Object> kafkaProperties, final boolean cleanUp) {
public Topology createTopology(final Map<String, Object> kafkaProperties) {
final TopologyBuilder topologyBuilder = TopologyBuilder.builder()
.topics(this.configuration.getTopics())
.topics(this.getTopics())
.kafkaProperties(kafkaProperties)
.cleanUp(cleanUp)
.build();
this.app.buildTopology(topologyBuilder);
return topologyBuilder.build();
}

@RequiredArgsConstructor
public class ExecutableStreamsApp {

private final @NonNull KafkaEndpointConfig endpointConfig;

public StreamsCleanUpRunner createCleanUpRunner() {
final Map<String, Object> kafkaProperties =
ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig);
final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties, true);
final StreamsCleanUpConfigurer configurer = new StreamsCleanUpConfigurer();
ConfiguredStreamsApp.this.app.setupCleanUp(configurer);
configurer.registerFinishHook(ConfiguredStreamsApp.this.app::close);
return StreamsCleanUpRunner.create(topology, kafkaProperties, configurer);
}

public StreamsRunner createRunner() {
return this.createRunner(StreamsExecutionOptions.builder().build());
}

public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) {
final Map<String, Object> kafkaProperties =
ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig);
final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties);
return StreamsRunner.builder()
.topology(topology)
.config(new StreamsConfig(kafkaProperties))
.executionOptions(executionOptions)
.hooks(this.createHooks())
.build();
}

private StreamsHooks createHooks() {
return StreamsHooks.builder()
.stateListener(ConfiguredStreamsApp.this.app.getStateListener())
.uncaughtExceptionHandler(ConfiguredStreamsApp.this.app.getUncaughtExceptionHandler())
.onStart(ConfiguredStreamsApp.this.app::onStreamsStart)
.onShutdown(ConfiguredStreamsApp.this.app::close)
.build();
}

}

}
Loading

0 comments on commit fa9a1f1

Please sign in to comment.