diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 989872b1..fc9eca7b 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -23,6 +23,8 @@ resources:
 
 jobs:
 - template: azure/gradle/build.yml@templates
+  parameters:
+    jdkVersion: '1.17'
 - template: azure/gradle/create_tag_version.yml@templates
 - template: azure/gradle/upload_release.yml@templates
 - template: azure/gradle/upload_snapshot.yml@templates
diff --git a/build.gradle.kts b/build.gradle.kts
index 2e6f7933..2184f083 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -1,7 +1,7 @@
 plugins {
     id("net.researchgate.release") version "3.0.2"
-    id("com.bakdata.sonar") version "1.1.7"
-    id("com.bakdata.sonatype") version "1.1.7"
+    id("com.bakdata.sonar") version "1.1.9"
+    id("com.bakdata.sonatype") version "1.1.9"
     id("org.hildan.github.changelog") version "1.12.1"
     id("io.freefair.lombok") version "6.6.1"
 }
diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml
index d074490a..eaee0047 100644
--- a/charts/streams-app-cleanup-job/templates/job.yaml
+++ b/charts/streams-app-cleanup-job/templates/job.yaml
@@ -112,6 +112,10 @@ spec:
             - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
               value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
             {{- end }}
+            {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }}
+            - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
+              value: {{ .Values.autoscaling.consumerGroup | quote }}
+            {{- end }}
             {{- range $key, $value := .Values.secrets }}
             - name: "{{ $key }}"
               valueFrom:
diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml
index 9fa76774..cb091e05 100644
--- a/charts/streams-app-cleanup-job/values.yaml
+++ b/charts/streams-app-cleanup-job/values.yaml
@@ -15,6 +15,7 @@ configurationEnvPrefix: "APP"
 streams:
 #  brokers: "test:9092"
 #  schemaRegistryUrl: "url:1234"
+  passApplicationId: false
   config: {}
 #    max.poll.records: 500
 #    Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
@@ -59,6 +60,9 @@ labels: {}
 
 # serviceAccountName: foo
 
+autoscaling: {}
+  # consumerGroup: foo
+
 tolerations: []
 #   - key: "foo"
 #     operator: "Exists"
diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml
index cd900fa2..9f357aad 100644
--- a/charts/streams-app/templates/deployment.yaml
+++ b/charts/streams-app/templates/deployment.yaml
@@ -175,6 +175,10 @@ spec:
             - name: "{{ .Values.configurationEnvPrefix }}_EXTRA_INPUT_PATTERNS"
               value: "{{- range $key, $value := .Values.streams.extraInputPatterns }}{{ $key }}={{ $value }},{{- end }}"
           {{- end }}
+          {{- if and (.Values.streams.passApplicationId) (hasKey .Values.autoscaling "consumerGroup") }}
+            - name: "{{ .Values.configurationEnvPrefix }}_APPLICATION_ID"
+              value: {{ .Values.autoscaling.consumerGroup | quote }}
+          {{- end }}
           {{- range $key, $value := .Values.secrets }}
             - name: "{{ $key }}"
               valueFrom:
diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml
index 0309adb6..479bb98b 100644
--- a/charts/streams-app/values.yaml
+++ b/charts/streams-app/values.yaml
@@ -27,6 +27,7 @@ streams:
   # schemaRegistryUrl: "url:1234"
   staticMembership: false
   optimizeLeaveGroupBehavior: true
+  passApplicationId: false
   config: {}
   #   max.poll.records: 500
   #   Note that YAML may convert large integers to scientific notation. Use Strings to avoid this.
diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java
index 33427103..4e66b77e 100644
--- a/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java
+++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java
@@ -1,7 +1,7 @@
 /*
  * MIT License
  *
- * Copyright (c) 2023 bakdata
+ * Copyright (c) 2024 bakdata
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -94,6 +94,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement
     @CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
             description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
     private boolean volatileGroupInstanceId;
+    @CommandLine.Option(names = "--application-id", description = "Application ID to use for Kafka Streams")
+    private String applicationId;
     private KafkaStreams streams;
     private Throwable lastException;
 
@@ -159,7 +161,9 @@ public void close() {
      * This must be set to a unique value for every application interacting with your kafka cluster to ensure internal
      * state encapsulation. Could be set to: className-inputTopic-outputTopic
      */
-    public abstract String getUniqueAppId();
+    public String getUniqueAppId() {
+        return null;
+    }
 
     /**
      * Create the topology of the Kafka Streams app
@@ -235,6 +239,23 @@ protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() {
         return new DefaultStreamsUncaughtExceptionHandler();
     }
 
+    public final String getStreamsApplicationId() {
+        final String uniqueAppId = this.getUniqueAppId();
+        if(uniqueAppId == null) {
+            if(this.applicationId == null) {
+                throw new IllegalArgumentException("Must pass --application-id or implement #getUniqueAppId()");
+            }
+            return this.applicationId;
+        }
+        if(this.applicationId == null) {
+            return uniqueAppId;
+        }
+        if(!uniqueAppId.equals(this.applicationId)) {
+            throw new IllegalArgumentException("Application ID provided via --application-id does not match #getUniqueAppId()");
+        }
+        return uniqueAppId;
+    }
+
     /**
      * <p>This method should give a default configuration to run your streaming application with.</p>
      * If {@link KafkaApplication#schemaRegistryUrl} is set {@link SpecificAvroSerde} is set as the default key, value
@@ -271,7 +292,7 @@ protected Properties createKafkaProperties() {
         kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");
 
         // topology
-        kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId());
+        kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, this.getApplicationId());
 
         this.configureDefaultSerde(kafkaConfig);
         kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBrokers());
@@ -322,7 +343,7 @@ protected void runCleanUp() {
         try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
             final CleanUpRunner cleanUpRunner = CleanUpRunner.builder()
                     .topology(this.createTopology())
-                    .appId(this.getUniqueAppId())
+                    .appId(this.getApplicationId())
                     .adminClient(adminClient)
                     .streams(this.streams)
                     .build();
diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java
index 667028a7..1553cc3e 100644
--- a/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java
+++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/CleanUpRunnerTest.java
@@ -1,7 +1,7 @@
 /*
  * MIT License
  *
- * Copyright (c) 2023 bakdata
+ * Copyright (c) 2024 bakdata
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -40,7 +40,7 @@ class CleanUpRunnerTest {
     void createTemporaryPropertiesFile() throws IOException {
         final WordCount wordCount = new WordCount();
         wordCount.setInputTopics(List.of("input"));
-        final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getUniqueAppId(),
+        final File file = CleanUpRunner.createTemporaryPropertiesFile(wordCount.getStreamsApplicationId(),
                 wordCount.getKafkaProperties());
 
         assertThat(file).exists();
diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java
index 8e21f9eb..03be66ee 100644
--- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java
+++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java
@@ -1,7 +1,7 @@
 /*
  * MIT License
  *
- * Copyright (c) 2023 bakdata
+ * Copyright (c) 2024 bakdata
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -172,7 +172,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
             this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
                     .extracting(ConsumerGroupListing::groupId)
                     .as("Consumer group exists")
-                    .contains(this.app.getUniqueAppId());
+                    .contains(this.app.getStreamsApplicationId());
         } catch (final TimeoutException | ExecutionException e) {
             throw new RuntimeException("Error retrieving consumer groups", e);
         }
@@ -184,7 +184,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException {
             this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
                     .extracting(ConsumerGroupListing::groupId)
                     .as("Consumer group is deleted")
-                    .doesNotContain(this.app.getUniqueAppId());
+                    .doesNotContain(this.app.getStreamsApplicationId());
         } catch (final TimeoutException | ExecutionException e) {
             throw new RuntimeException("Error retrieving consumer groups", e);
         }
@@ -211,7 +211,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept
             this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
                     .extracting(ConsumerGroupListing::groupId)
                     .as("Consumer group exists")
-                    .contains(this.app.getUniqueAppId());
+                    .contains(this.app.getStreamsApplicationId());
         } catch (final TimeoutException | ExecutionException e) {
             throw new RuntimeException("Error retrieving consumer groups", e);
         }
@@ -219,12 +219,12 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept
         delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
         try (final AdminClient adminClient = AdminClient.create(this.app.getKafkaProperties())) {
-            adminClient.deleteConsumerGroups(List.of(this.app.getUniqueAppId())).all()
+            adminClient.deleteConsumerGroups(List.of(this.app.getStreamsApplicationId())).all()
                     .get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
             this.softly.assertThat(adminClient.listConsumerGroups().all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
                     .extracting(ConsumerGroupListing::groupId)
                     .as("Consumer group is deleted")
-                    .doesNotContain(this.app.getUniqueAppId());
+                    .doesNotContain(this.app.getStreamsApplicationId());
         } catch (final TimeoutException | ExecutionException e) {
             throw new RuntimeException("Error deleting consumer group", e);
         }
@@ -237,9 +237,9 @@ void shouldDeleteInternalTopics() throws InterruptedException {
 
         final String inputTopic = this.app.getInputTopic();
         final String internalTopic =
-                this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition";
+                this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition";
         final String backingTopic =
-                this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog";
+                this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog";
         final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC;
 
         final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
@@ -410,9 +410,9 @@ void shouldDeleteSchemaOfInternalTopics()
 
         final String inputSubject = this.app.getInputTopic() + "-value";
         final String internalSubject =
-                this.app.getUniqueAppId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value";
+                this.app.getStreamsApplicationId() + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value";
         final String backingSubject =
-                this.app.getUniqueAppId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value";
+                this.app.getStreamsApplicationId() + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value";
         final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value";
 
         final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient();
@@ -469,7 +469,7 @@ void shouldCallCleanupHookForInternalTopics() {
         this.app = this.createComplexCleanUpHookApplication();
 
         this.runCleanUp();
-        final String uniqueAppId = this.app.getUniqueAppId();
+        final String uniqueAppId = this.app.getStreamsApplicationId();
         verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition");
         verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog");
         verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
@@ -481,7 +481,7 @@ void shouldCallCleanUpHookForAllTopics() {
         this.app = this.createComplexCleanUpHookApplication();
 
         this.runCleanUpWithDeletion();
-        final String uniqueAppId = this.app.getUniqueAppId();
+        final String uniqueAppId = this.app.getStreamsApplicationId();
         verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition");
         verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog");
         verify(this.topicCleanUpHook).accept(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog");
diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java
index 6ea870a3..1fc24a5a 100644
--- a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java
+++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java
@@ -1,7 +1,7 @@
 /*
  * MIT License
  *
- * Copyright (c) 2023 bakdata
+ * Copyright (c) 2024 bakdata
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
@@ -49,7 +49,7 @@ void setup() {
         this.app = new ComplexTopologyApplication();
         this.app.setInputTopics(List.of("input", "input2"));
         this.app.setOutputTopic("output");
-        this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getUniqueAppId());
+        this.topologyInformation = new TopologyInformation(this.app.createTopology(), this.app.getStreamsApplicationId());
     }
 
     @Test
@@ -131,7 +131,7 @@ void shouldNotReturnInputTopics() {
     void shouldReturnAllInternalTopics() {
         assertThat(this.topologyInformation.getInternalTopics())
                 .hasSize(3)
-                .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getUniqueAppId())
+                .allMatch(topic -> topic.contains("-KSTREAM-") && topic.startsWith(this.app.getStreamsApplicationId())
                         || topic.startsWith("KSTREAM-"))
                 .allMatch(topic -> topic.endsWith("-changelog") || topic.endsWith("-repartition"));
     }