Skip to content

Commit

Permalink
Merge pull request #1512 from AnandInguva:fix-no-auth
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 630530928
  • Loading branch information
cloud-teleport committed May 3, 2024
2 parents 7939984 + 1e7312c commit ba3d4e3
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,13 @@ public static PipelineResult run(KafkaToGcsOptions options) throws UnsupportedOp

options.setStreaming(true);

String kafkaSaslPlainUserName = SecretManagerUtils.getSecret(options.getUserNameSecretID());
String kafkaSaslPlainPassword = SecretManagerUtils.getSecret(options.getPasswordSecretID());

Map<String, Object> kafkaConfig = new HashMap<>();
// Set offset to either earliest or latest.
kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, options.getOffset());
// Authenticate to Kafka only when user provides authentication params.
if (useKafkaAuth) {
String kafkaSaslPlainUserName = SecretManagerUtils.getSecret(options.getUserNameSecretID());
String kafkaSaslPlainPassword = SecretManagerUtils.getSecret(options.getPasswordSecretID());
kafkaConfig.putAll(
ClientAuthConfig.getSaslPlainConfig(kafkaSaslPlainUserName, kafkaSaslPlainPassword));
}
Expand All @@ -277,6 +276,7 @@ public static void validateAuthOptions(KafkaToGcsOptions options) {
// the dataflow pipeline and Kafka broker is on the same network.
if (options.getUserNameSecretID().isBlank() && options.getPasswordSecretID().isBlank()) {
useKafkaAuth = false;
return;
}

if ((options.getUserNameSecretID().isBlank() && !options.getPasswordSecretID().isBlank())
Expand Down

0 comments on commit ba3d4e3

Please sign in to comment.