diff --git a/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcs2.java b/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcs2.java index 2860a8bc83..db6c31de26 100644 --- a/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcs2.java +++ b/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcs2.java @@ -244,14 +244,13 @@ public static PipelineResult run(KafkaToGcsOptions options) throws UnsupportedOp options.setStreaming(true); - String kafkaSaslPlainUserName = SecretManagerUtils.getSecret(options.getUserNameSecretID()); - String kafkaSaslPlainPassword = SecretManagerUtils.getSecret(options.getPasswordSecretID()); - Map kafkaConfig = new HashMap<>(); // Set offset to either earliest or latest. kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, options.getOffset()); // Authenticate to Kafka only when user provides authentication params. if (useKafkaAuth) { + String kafkaSaslPlainUserName = SecretManagerUtils.getSecret(options.getUserNameSecretID()); + String kafkaSaslPlainPassword = SecretManagerUtils.getSecret(options.getPasswordSecretID()); kafkaConfig.putAll( ClientAuthConfig.getSaslPlainConfig(kafkaSaslPlainUserName, kafkaSaslPlainPassword)); } @@ -277,6 +276,7 @@ public static void validateAuthOptions(KafkaToGcsOptions options) { // the dataflow pipeline and Kafka broker is on the same network. if (options.getUserNameSecretID().isBlank() && options.getPasswordSecretID().isBlank()) { useKafkaAuth = false; + return; } if ((options.getUserNameSecretID().isBlank() && !options.getPasswordSecretID().isBlank())