Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 4, 2024
1 parent c96c8af commit 33d841a
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ public StreamsTopicConfig getTopics() {
public ExecutableStreamsApp<T> withEndpoint(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final Topology topology = this.createTopology(kafkaProperties);
return new ExecutableStreamsApp<>(topology, new StreamsConfig(kafkaProperties), this.app);
return ExecutableStreamsApp.<T>builder()
.topology(topology)
.streamsConfig(new StreamsConfig(kafkaProperties))
.app(this.app)
.setup(() -> this.setupApp(kafkaProperties))
.build();
}

/**
Expand All @@ -139,4 +144,12 @@ public void close() {
this.app.close();
}

private void setupApp(final Map<String, Object> kafkaProperties) {
final StreamsAppSetupConfiguration configuration = StreamsAppSetupConfiguration.builder()
.kafkaProperties(kafkaProperties)
.topics(this.getTopics())
.build();
this.app.setup(configuration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ public ProducerRunner createRunner() {
.topics(this.topics)
.kafkaProperties(this.kafkaProperties)
.build();
this.setup();
return new ProducerRunner(() -> this.app.run(producerBuilder));
}

@Override
public void close() {
this.app.close();
}

private void setup() {
final ProducerAppSetupConfiguration configuration = ProducerAppSetupConfiguration.builder()
.topics(this.topics)
.kafkaProperties(this.kafkaProperties)
.build();
this.app.setup(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,29 @@

package com.bakdata.kafka;

import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

/**
* A {@link StreamsApp} with a corresponding {@link Topology} and {@link StreamsConfig}
* @param <T> type of {@link ProducerApp}
*/
@RequiredArgsConstructor
@Builder(access = AccessLevel.PACKAGE)
@Getter
public class ExecutableStreamsApp<T extends StreamsApp> implements AutoCloseable {

@Getter
private final @NonNull Topology topology;
@Getter
private final @NonNull StreamsConfig streamsConfig;
@Getter
private final @NonNull T app;
@Builder.Default
private final @NonNull Runnable setup = () -> {};

/**
* Create {@code StreamsCleanUpRunner} in order to clean application
Expand All @@ -67,6 +73,7 @@ public StreamsRunner createRunner() {
* @see StreamsRunner#StreamsRunner(Topology, StreamsConfig, StreamsExecutionOptions)
*/
public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) {
this.setup.run();
return new StreamsRunner(this.topology, this.streamsConfig, executionOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
@FunctionalInterface
public interface ProducerApp extends AutoCloseable {

default void setup(final ProducerAppSetupConfiguration configuration) {
// do nothing by default
}

/**
* Called when running the producer
* @param builder provides all runtime application configurations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import static java.util.Collections.emptyMap;

import com.bakdata.kafka.util.ImprovedAdminClient;
import java.util.Map;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

@Builder
@Value
public class ProducerAppSetupConfiguration {
@Builder.Default
@NonNull ProducerTopicConfig topics = ProducerTopicConfig.builder().build();
@Builder.Default
@NonNull Map<String, Object> kafkaProperties = emptyMap();

/**
* Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties}
* @return {@code ImprovedAdminClient}
*/
public ImprovedAdminClient createAdminClient() {
return ImprovedAdminClient.create(this.kafkaProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import com.bakdata.kafka.util.ImprovedAdminClient;
import java.util.Map;
import lombok.AccessLevel;
import lombok.Builder;
Expand Down Expand Up @@ -70,12 +69,4 @@ public <K, V> Producer<K, V> createProducer(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
return new KafkaProducer<>(this.kafkaProperties, keySerializer, valueSerializer);
}

/**
* Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties}
* @return {@code ImprovedAdminClient}
*/
public ImprovedAdminClient createAdminClient() {
return ImprovedAdminClient.create(this.kafkaProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
public interface StreamsApp extends AutoCloseable {
int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3;

default void setup(final StreamsAppSetupConfiguration configuration) {
// do nothing by default
}

/**
* Build the Kafka Streams {@link org.apache.kafka.streams.Topology} to be run by the app.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import static java.util.Collections.emptyMap;

import com.bakdata.kafka.util.ImprovedAdminClient;
import java.util.Map;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

@Builder
@Value
public class StreamsAppSetupConfiguration {
@Builder.Default
@NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build();
@Builder.Default
@NonNull Map<String, Object> kafkaProperties = emptyMap();

/**
* Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties}
* @return {@code ImprovedAdminClient}
*/
public ImprovedAdminClient createAdminClient() {
return ImprovedAdminClient.create(this.kafkaProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import com.bakdata.kafka.util.ImprovedAdminClient;
import java.util.Map;
import lombok.AccessLevel;
import lombok.Builder;
Expand Down Expand Up @@ -136,14 +135,6 @@ public <K, V> KStream<K, V> streamInputPattern(final String role) {
return this.streamsBuilder.stream(this.topics.getInputPattern(role));
}

/**
* Create a new {@code ImprovedAdminClient} using {@link #kafkaProperties}
* @return {@code ImprovedAdminClient}
*/
public ImprovedAdminClient createAdminClient() {
return ImprovedAdminClient.create(this.kafkaProperties);
}

Topology build() {
return this.streamsBuilder.build();
}
Expand Down

0 comments on commit 33d841a

Please sign in to comment.