Skip to content

Commit

Permalink
Separate CLI and application (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jul 19, 2024
1 parent 1a0f2a4 commit 3c8880c
Show file tree
Hide file tree
Showing 137 changed files with 7,487 additions and 2,821 deletions.
115 changes: 70 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ You can add streams-bootstrap via Maven Central.
#### Gradle

```gradle
compile group: 'com.bakdata.kafka', name: 'streams-bootstrap', version: '2.1.1'
implementation group: 'com.bakdata.kafka', name: 'streams-bootstrap-cli', version: '3.0.0'
```

With Kotlin DSL

```gradle
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = "3.0.0")
```

#### Maven
Expand All @@ -35,8 +41,8 @@ compile group: 'com.bakdata.kafka', name: 'streams-bootstrap', version: '2.1.1'

<dependency>
<groupId>com.bakdata.kafka</groupId>
<artifactId>streams-bootstrap</artifactId>
<version>2.1.1</version>
<artifactId>streams-bootstrap-cli</artifactId>
<version>3.0.0</version>
</dependency>
```

Expand All @@ -52,8 +58,10 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil

```java
import com.bakdata.kafka.KafkaStreamsApplication;
import java.util.Properties;
import org.apache.kafka.streams.StreamsBuilder;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsBootstrapApplication extends KafkaStreamsApplication {
Expand All @@ -62,26 +70,30 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication {
}

@Override
public void buildTopology(final StreamsBuilder builder) {
final KStream<String, String> input =
builder.<String, String>stream(this.getInputTopics());
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();

// your topology
// your topology

input.to(this.getOutputTopic());
}

@Override
public String getUniqueAppId() {
return "streams-bootstrap-app";
}
input.to(builder.getTopics().getOutputTopic());
}

// Optionally you can override the default streams bootstrap Kafka properties
@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
@Override
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "streams-bootstrap-app-" + topics.getOutputTopic();
}

return kafkaProperties;
// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
// your config
);
}
};
}
}
```
Expand All @@ -92,6 +104,8 @@ The following configuration options are available:

- `--schema-registry-url`: The URL of the Schema Registry

- `--kafka-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--input-topics`: List of input topics (comma-separated)

- `--input-pattern`: Pattern of input topics
Expand All @@ -100,8 +114,6 @@ The following configuration options are available:

- `--error-topic`: A topic to write errors to

- `--streams-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--extra-input-topics`: Additional named input topics if you need to specify multiple topics with different message
types (`<String=String>[,<String=String>...]`)

Expand All @@ -113,41 +125,53 @@ The following configuration options are available:

- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.

- `--clean-up`: Whether the state of the Kafka Streams app, i.e., offsets and state stores and auto-created topics,
should be cleared instead of running the app
- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `--delete-output`: Whether the output topics with their associated schemas and the consumer group should be deleted
during the cleanup
- `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
topics associated with the Kafka Streams application.

- `--debug`: Configure logging to debug
- `reset`: Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams
application.

#### Kafka producer

Create a subclass of `KafkaProducerApplication`.

```java
import com.bakdata.kafka.KafkaProducerApplication;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;

public class StreamsBootstrapApplication extends KafkaProducerApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
}

@Override
protected void runApplication() {
try (final KafkaProducer<Object, Object> producer = this.createProducer()) {
// your producer
public ProducerApp createApp(final boolean cleanUp) {
return new ProducerApp() {
@Override
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
return () -> {
try (final Producer<Object, Object> producer = builder.createProducer()) {
// your producer
}
};
}
}

// Optionally you can override the default streams bootstrap Kafka properties
@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();

return kafkaProperties;
// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
// your config
);
}
};
}
}
```
Expand All @@ -158,17 +182,18 @@ The following configuration options are available:

- `--schema-registry-url`: The URL of the Schema Registry

- `--output-topic`: The output topic
- `--kafka-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)

- `--streams-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)
- `--output-topic`: The output topic

- `--extra-output-topics`: Additional named output topics (`String=String>[,<String=String>...]`)

- `--clean-up`: Whether the output topics and associated schemas of the producer app should be deleted instead of
running the app

- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `clean`: Delete all output topics associated with the Kafka Producer application.

### Helm Charts

For the configuration and deployment to Kubernetes, you can use
Expand Down
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
}

Expand All @@ -16,6 +16,7 @@ allprojects {
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
}
}

Expand Down
4 changes: 2 additions & 2 deletions charts/producer-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ spec:
imagePullPolicy: "{{ .Values.imagePullPolicy }}"
resources:
{{ toYaml .Values.resources | indent 12 }}
args:
- clean
env:
- name: ENV_PREFIX
value: {{ .Values.configurationEnvPrefix }}_
Expand All @@ -74,8 +76,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
- name: "{{ .Values.configurationEnvPrefix }}_CLEAN_UP"
value: "true"
{{- if hasKey .Values.streams "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.streams.outputTopic | quote }}
Expand Down
2 changes: 1 addition & 1 deletion charts/producer-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
### Streams

| Parameter | Description | Default |
| --------------------------- | ---------------------------------------------------------------------------------------------------------- | ------- |
|-----------------------------|------------------------------------------------------------------------------------------------------------|---------|
| `streams.brokers` | Comma separated list of Kafka brokers to connect to. | |
| `streams.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `streams.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` |
Expand Down
16 changes: 6 additions & 10 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ spec:
imagePullPolicy: "{{ .Values.imagePullPolicy }}"
resources:
{{ toYaml .Values.resources | indent 12 }}
args:
{{- if .Values.streams.deleteOutput }}
- clean
{{- else }}
- reset
{{- end }}
env:
- name: ENV_PREFIX
value: {{ .Values.configurationEnvPrefix }}_
Expand All @@ -70,20 +76,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.streams "productive" }}
- name: "{{ .Values.configurationEnvPrefix }}_PRODUCTIVE"
value: {{ .Values.streams.productive | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
- name: "{{ .Values.configurationEnvPrefix }}_CLEAN_UP"
value: "true"
{{- if hasKey .Values.streams "deleteOutput" }}
- name: "{{ .Values.configurationEnvPrefix }}_DELETE_OUTPUT"
value: {{ .Values.streams.deleteOutput | quote }}
{{- end }}
{{- if and (hasKey .Values.streams "inputTopics") (.Values.streams.inputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS"
value: {{ .Values.streams.inputTopics | join "," | quote }}
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ streams:
extraOutputTopics: {}
# role: output
# errorTopic: error
# productive: true
deleteOutput: false

commandLine: {}
Expand Down
29 changes: 14 additions & 15 deletions charts/streams-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,20 @@ Alternatively, a YAML file that specifies the values for the parameters can be p

### Streams

| Parameter | Description | Default |
| ------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `streams.brokers` | Comma separated list of Kafka brokers to connect to. | |
| `streams.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `streams.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` |
| `streams.optimizeLeaveGroupBehavior` | Enabling this optimizes the leave group behavior when a pod is terminated. Depends on the deployment kind, i.e., `statefulSet`. Requires the app to use streams-bootstrap 2.7+. | `true` |
| `streams.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` |
| `streams.inputTopics` | List of input topics for your streams application. | `[]` |
| `streams.extraInputTopics` | Map of additional named input topics if you need to specify multiple topics with different message types. | `{}` |
| `streams.inputPattern` | Input pattern of topics for your streams application. | |
| `streams.extraInputPatterns` | Map of additional named input patterns if you need to specify multiple topics with different message types. | `{}` |
| `streams.outputTopic` | Output topic for your streams application. | |
| `streams.extraOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` |
| `streams.errorTopic` | Error topic for your streams application. | |
| `streams.productive` | Whether to use Kafka configuration values that are more suitable for production environments. | `true` |
| Parameter | Description | Default |
|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `streams.brokers` | Comma separated list of Kafka brokers to connect to. | |
| `streams.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `streams.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` |
| `streams.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` |
| `streams.inputTopics` | List of input topics for your streams application. | `[]` |
| `streams.extraInputTopics` | Map of additional named input topics if you need to specify multiple topics with different message types. | `{}` |
| `streams.inputPattern` | Input pattern of topics for your streams application. | |
| `streams.extraInputPatterns` | Map of additional named input patterns if you need to specify multiple topics with different message types. | `{}` |
| `streams.outputTopic` | Output topic for your streams application. | |
| `streams.extraOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` |
| `streams.errorTopic` | Error topic for your streams application. | |
| `streams.productive` | Whether to use Kafka configuration values that are more suitable for production environments. | `true` |

### Other

Expand Down
6 changes: 1 addition & 5 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ spec:
- name: KAFKA_JMX_PORT
value: "{{ .Values.jmx.port }}"
{{- end }}
{{- if and (.Values.streams.optimizeLeaveGroupBehavior) (not .Values.statefulSet) }}
{{- if not .Values.statefulSet }}
- name: "{{ .Values.configurationEnvPrefix }}_VOLATILE_GROUP_INSTANCE_ID"
value: "true"
{{- end }}
Expand All @@ -123,10 +123,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values.streams "productive" }}
- name: "{{ .Values.configurationEnvPrefix }}_PRODUCTIVE"
value: {{ .Values.streams.productive | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
Expand Down
2 changes: 0 additions & 2 deletions charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ streams:
# brokers: "test:9092"
# schemaRegistryUrl: "url:1234"
staticMembership: false
optimizeLeaveGroupBehavior: true
config: {}
# max.poll.records: 500
# Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
Expand All @@ -46,7 +45,6 @@ streams:
extraOutputTopics: {}
# role: output
# errorTopic: error
# productive: true

commandLine: {}
# MY_CLI_PARAM: "foo-bar"
Expand Down
14 changes: 10 additions & 4 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
version=2.23.1-SNAPSHOT
version=3.0.0-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
# running Kafka JUnit in parallel causes problems
org.gradle.parallel=false
kafkaVersion=3.6.1
kafkaJunitVersion=3.6.0
confluentVersion=7.6.0
fluentKafkaVersion=2.13.1
org.gradle.jvmargs=-Xmx2048m
fluentKafkaVersion=2.14.0
junitVersion=5.10.2
mockitoVersion=5.11.0
assertJVersion=3.25.3
log4jVersion=2.23.1
org.gradle.jvmargs=-Xmx4096m
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ pluginManagement {
rootProject.name = 'streams-bootstrap'

include(
":streams-bootstrap",
":streams-bootstrap-core",
":streams-bootstrap-test",
":streams-bootstrap-large-messages",
":streams-bootstrap-cli",
)
Loading

0 comments on commit 3c8880c

Please sign in to comment.