diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index b2eeb1a54d1da..e87669ab2b0a4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -149,10 +149,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { Map consumerConfigs = new HashMap<>( MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new HashMap<>())); - consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); - consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + consumerConfigs.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); + consumerConfigs.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); + consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); String format = configuration.getFormat(); boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());