Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v3' into feature/consumer-group-…
Browse files Browse the repository at this point in the history
…param

# Conflicts:
#	charts/streams-app/README.md
#	charts/streams-app/templates/scaled-object.yaml
  • Loading branch information
philipp94831 committed Jul 25, 2024
2 parents 7b5d7db + 1a4b565 commit c1c0be4
Show file tree
Hide file tree
Showing 50 changed files with 707 additions and 318 deletions.
26 changes: 18 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil

```java
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsBootstrapApplication extends KafkaStreamsApplication {
public class MyStreamsApplication extends KafkaStreamsApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyStreamsApplication(), args);
}

@Override
Expand All @@ -86,6 +88,11 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication {
return "streams-bootstrap-app-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand Down Expand Up @@ -125,8 +132,6 @@ The following configuration options are available:

- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.

- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
Expand All @@ -144,12 +149,14 @@ import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class StreamsBootstrapApplication extends KafkaProducerApplication {
public class MyProducerApplication extends KafkaProducerApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyProducerApplication(), args);
}

@Override
Expand All @@ -164,6 +171,11 @@ public class StreamsBootstrapApplication extends KafkaProducerApplication {
};
}

@Override
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, StringSerializer.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand All @@ -188,8 +200,6 @@ The following configuration options are available:

- `--extra-output-topics`: Additional named output topics (`String=String>[,<String=String>...]`)

- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `clean`: Delete all output topics associated with the Kafka Producer application.
Expand Down
24 changes: 10 additions & 14 deletions charts/producer-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,25 @@ spec:
env:
- name: ENV_PREFIX
value: {{ .Values.configurationEnvPrefix }}_
{{- range $key, $value := .Values.streams.config }}
- name: {{ printf "STREAMS_%s" $key | replace "." "_" | upper | quote }}
{{- range $key, $value := .Values.kafka.config }}
- name: {{ printf "KAFKA_%s" $key | replace "." "_" | upper | quote }}
value: {{ $value | quote }}
{{- end }}
{{- if hasKey .Values.streams "brokers" }}
{{- if hasKey .Values.kafka "brokers" }}
- name: "{{ .Values.configurationEnvPrefix }}_BROKERS"
value: {{ .Values.streams.brokers | quote }}
value: {{ .Values.kafka.brokers | quote }}
{{- end }}
{{- if hasKey .Values.streams "schemaRegistryUrl" }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
{{- if hasKey .Values.streams "outputTopic" }}
{{- if hasKey .Values.kafka "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.streams.outputTopic | quote }}
value: {{ .Values.kafka.outputTopic | quote }}
{{- end }}
{{- if and (hasKey .Values.streams "extraOutputTopics") (.Values.streams.extraOutputTopics) }}
{{- if and (hasKey .Values.kafka "extraOutputTopics") (.Values.kafka.extraOutputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_OUTPUT_TOPICS"
value: "{{- range $key, $value := .Values.streams.extraOutputTopics }}{{ $key }}={{ $value }},{{- end }}"
value: "{{- range $key, $value := .Values.kafka.extraOutputTopics }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
Expand Down
4 changes: 1 addition & 3 deletions charts/producer-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ files: {}
# mountPath: app/resources
# content: "foo bar"

streams:
kafka:
# brokers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
Expand All @@ -29,8 +29,6 @@ streams:
commandLine: {}
# MY_CLI_PARAM: "foo-bar"

debug: false

env: {}
# MY_ENV_VARIABLE: foo-bar

Expand Down
33 changes: 16 additions & 17 deletions charts/producer-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,25 @@ Alternatively, a YAML file that specifies the values for the parameters can be p

### Streams

| Parameter | Description | Default |
|-----------------------------|------------------------------------------------------------------------------------------------------------|---------|
| `streams.brokers` | Comma separated list of Kafka brokers to connect to. | |
| `streams.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `streams.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` |
| `streams.outputTopic` | Output topic for your producer application. | |
| `streams.extraOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` |
| Parameter | Description | Default |
|---------------------------|------------------------------------------------------------------------------------------------------------|---------|
| `kafka.brokers` | Comma separated list of Kafka brokers to connect to. | |
| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` |
| `kafka.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` |
| `kafka.outputTopic` | Output topic for your producer application. | |
| `kafka.extraOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` |

### Other

| Parameter | Description | Default |
| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` |
| `commandLine` | Map of command line arguments passed to the producer app. | `{}` |
| `debug` | Configure logging to debug | `false` |
| `env` | Custom environment variables | `{}` |
| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `STREAMS_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `STREAMS_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` |
| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` |
| `secretFilesRefs` | Mount existing secrets as volumes | `[]` |
| `files` | Map of files to mount for the app. File will be mounted as `$value.mountPath/$key`. `$value.content` denotes file content (recommended to be used with `--set-file`). | `{}` |
| Parameter | Description | Default |
|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` |
| `commandLine` | Map of command line arguments passed to the producer app. | `{}` |
| `env` | Custom environment variables | `{}` |
| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `KAFKA_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `KAFKA_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` |
| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` |
| `secretFilesRefs` | Mount existing secrets as volumes | `[]` |
| `files` | Map of files to mount for the app. File will be mounted as `$value.mountPath/$key`. `$value.content` denotes file content (recommended to be used with `--set-file`). | `{}` |

### JVM

Expand Down
24 changes: 10 additions & 14 deletions charts/producer-app/templates/pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,25 @@ spec:
env:
- name: ENV_PREFIX
value: {{ .Values.configurationEnvPrefix }}_
{{- range $key, $value := .Values.streams.config }}
- name: {{ printf "STREAMS_%s" $key | replace "." "_" | upper | quote }}
{{- range $key, $value := .Values.kafka.config }}
- name: {{ printf "KAFKA_%s" $key | replace "." "_" | upper | quote }}
value: {{ $value | quote }}
{{- end }}
{{- if hasKey .Values.streams "brokers" }}
{{- if hasKey .Values.kafka "brokers" }}
- name: "{{ .Values.configurationEnvPrefix }}_BROKERS"
value: {{ .Values.streams.brokers | quote }}
value: {{ .Values.kafka.brokers | quote }}
{{- end }}
{{- if hasKey .Values.streams "schemaRegistryUrl" }}
{{- if hasKey .Values.kafka "schemaRegistryUrl" }}
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
value: {{ .Values.kafka.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
{{- if hasKey .Values.streams "outputTopic" }}
{{- if hasKey .Values.kafka "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.streams.outputTopic | quote }}
value: {{ .Values.kafka.outputTopic | quote }}
{{- end }}
{{- if and (hasKey .Values.streams "extraOutputTopics") (.Values.streams.extraOutputTopics) }}
{{- if and (hasKey .Values.kafka "extraOutputTopics") (.Values.kafka.extraOutputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_OUTPUT_TOPICS"
value: "{{- range $key, $value := .Values.streams.extraOutputTopics }}{{ $key }}={{ $value }},{{- end }}"
value: "{{- range $key, $value := .Values.kafka.extraOutputTopics }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
Expand Down
4 changes: 1 addition & 3 deletions charts/producer-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ resources:
memory: 2G
cpu: 500m

streams:
kafka:
# brokers: "test:9092"
# schemaRegistryUrl: "url:1234"
config: {}
Expand All @@ -62,8 +62,6 @@ streams:
commandLine: {}
# MY_CLI_PARAM: "foo-bar"

debug: false

env: {}
# MY_ENV_VARIABLE: foo-bar

Expand Down
Loading

0 comments on commit c1c0be4

Please sign in to comment.