Skip to content

Commit

Permalink
Add CLI parameter to specify streams application ID
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jul 22, 2024
1 parent 3c8880c commit 9e7fdc7
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 11 deletions.
4 changes: 4 additions & 0 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.streams "applicationId" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.streams.applicationId | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
2 changes: 1 addition & 1 deletion charts/streams-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| `streams.extraOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` |
| `streams.errorTopic` | Error topic for your streams application. | |
| `streams.productive` | Whether to use Kafka configuration values that are more suitable for production environments. | `true` |
| `streams.applicationId` | Unique application ID for Kafka Streams. Required for auto-scaling | |

### Other

Expand Down Expand Up @@ -101,7 +102,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| Parameter | Description | Default |
| -------------------------------- | ------------------------------------------------------------------------------------------------------------------ | ---------- |
| `autoscaling.enabled` | Whether to enable auto-scaling using [KEDA](https://keda.sh/docs/latest/scalers/apache-kafka/). | `false` |
| `autoscaling.consumerGroup` | Name of the consumer group used for checking the offset on the topic and processing the related lag. | |
| `autoscaling.lagThreshold` | Average target value to trigger scaling actions. | |
| `autoscaling.pollingInterval` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#pollinginterval | `30` |
| `autoscaling.cooldownPeriod` | https://keda.sh/docs/2.10/concepts/scaling-deployments/#cooldownperiod | `300` |
Expand Down
10 changes: 7 additions & 3 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ kind: Deployment
{{- end }}
metadata:
name: {{ template "streams-app.fullname" . }}
{{- if or .Values.autoscaling.consumerGroup .Values.annotations }}
{{- if or .Values.streams.applicationId .Values.annotations }}
annotations:
{{- range $key, $value := .Values.annotations }}
{{ $key | quote }}: {{ $value | quote }}
{{- end }}
{{- if and .Values.autoscaling.consumerGroup (not .Values.annotations.consumerGroup) }}
consumerGroup: {{ .Values.autoscaling.consumerGroup | quote }}
{{- if and .Values.streams.applicationId (not .Values.annotations.consumerGroup) }}
consumerGroup: {{ .Values.streams.applicationId | quote }}
{{- end }}
{{- end }}
labels:
Expand Down Expand Up @@ -156,6 +156,10 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
{{- end }}
{{- if hasKey .Values.streams "applicationId" }}
- name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
value: {{ .Values.streams.applicationId | quote }}
{{- end }}
{{- range $key, $value := .Values.secrets }}
- name: "{{ $key }}"
valueFrom:
Expand Down
10 changes: 5 additions & 5 deletions charts/streams-app/templates/scaled-object.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spec:
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.streams.brokers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
consumerGroup: {{ $root.Values.streams.applicationId }}
topic: {{ . | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
Expand All @@ -44,16 +44,16 @@ spec:
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.streams.brokers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
topic: {{ printf "%s-%s" $root.Values.autoscaling.consumerGroup . | quote }}
consumerGroup: {{ $root.Values.streams.applicationId }}
topic: {{ printf "%s-%s" $root.Values.streams.applicationId . | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
{{- end }}
{{- range .Values.autoscaling.topics }}
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.streams.brokers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
consumerGroup: {{ $root.Values.streams.applicationId }}
topic: {{ . | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
Expand All @@ -63,7 +63,7 @@ spec:
- type: kafka
metadata:
bootstrapServers: {{ $root.Values.streams.brokers }}
consumerGroup: {{ $root.Values.autoscaling.consumerGroup }}
consumerGroup: {{ $root.Values.streams.applicationId }}
topic: {{ $topic | quote }}
lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }}
offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }}
Expand Down
2 changes: 1 addition & 1 deletion charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ streams:
extraOutputTopics: {}
# role: output
# errorTopic: error
# applicationId: foo

commandLine: {}
# MY_CLI_PARAM: "foo-bar"
Expand Down Expand Up @@ -99,7 +100,6 @@ jmx:

autoscaling:
enabled: false
# consumerGroup: foo
# lagThreshold: "1000"
pollingInterval: 30
cooldownPeriod: 300
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public abstract class KafkaStreamsApplication extends
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
@CommandLine.Option(names = "--application-id",
description = "Unique application ID to use for Kafka Streams. Can also be provided by implementing "
+ "StreamsApp#getUniqueAppId()")
private String uniqueAppId;

/**
* Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
Expand Down Expand Up @@ -134,7 +138,14 @@ public final StreamsTopicConfig createTopicConfig() {
@Override
public final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final StreamsApp app,
final AppConfiguration<StreamsTopicConfig> configuration) {
return new ConfiguredStreamsApp<>(app, configuration);
final ConfiguredStreamsApp<StreamsApp> configuredApp = new ConfiguredStreamsApp<>(app, configuration);
if (this.uniqueAppId != null) {
if (!configuredApp.getUniqueAppId().equals(this.uniqueAppId)) {
throw new IllegalArgumentException(
"Application ID provided via --application-id does not match StreamsApp#getUniqueAppId()");
}
}
return configuredApp;
}

/**
Expand Down
27 changes: 27 additions & 0 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,33 @@ public void run() {
});
}

@Test
@ExpectSystemExitWithStatus(2)
void shouldExitWithErrorCodeOnInconsistentAppId() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput().to(builder.getTopics().getOutputTopic());
}

@Override
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "my-id";
}
};
}
}, new String[]{
"--brokers", "localhost:9092",
"--schema-registry-url", "http://localhost:8081",
"--input-topics", "input",
"--output-topic", "output",
"--application-id", "my-other-id"
});
}

@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
Expand Down

0 comments on commit 9e7fdc7

Please sign in to comment.