Skip to content

Commit

Permalink
Merge pull request #1617 from AnandInguva:update_doc
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 638467962
  • Loading branch information
cloud-teleport committed May 30, 2024
2 parents cf1c8f8 + e7db9d0 commit 2ce397f
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ final class Offset {
groupName = "Source",
optional = true,
description = "Consumer Group ID",
helpText = "Consumer group ID to commit offsets to Kafka.")
helpText =
"The unique identifier for the consumer group that this pipeline belongs to."
+ " Required if Commit Offsets to Kafka is enabled.")
@Default.String("")
String getConsumerGroupId();

Expand All @@ -77,7 +79,8 @@ final class Offset {
optional = true,
description = "Default Kafka Start Offset",
helpText =
"The Kafka offset to read from. If there are no committed offsets on Kafka, default offset will be used.")
"The starting point for reading messages when no committed offsets exist."
+ " The earliest starts from the beginning, the latest from the newest message.")
@Default.String(Offset.LATEST)
String getKafkaReadOffset();

Expand All @@ -92,8 +95,12 @@ final class Offset {
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.TLS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE),
},
description = "Authentication Mode",
helpText = "Kafka read authentication mode. Can be NONE, SASL_PLAIN or TLS")
description = "Kafka Source Authentication Mode",
helpText =
"The mode of authentication to use with the Kafka cluster. "
+ "Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, "
+ "and TLS for certificate-based authentication. "
+ "Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.")
@Default.String("NONE")
String getKafkaReadAuthenticationMode();

Expand All @@ -105,10 +112,11 @@ final class Offset {
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_PLAIN},
optional = true,
description = "Username",
description = "Secret Version ID For Kafka SASL/PLAIN Username",
helpText =
"Secret Manager secret ID for the SASL_PLAIN username. Should be in the format projects/{project}/secrets/{secret}/versions/{secret_version}.",
example = "projects/your-project-id/secrets/your-secret/versions/your-secret-version")
"The Google Cloud Secret Manager secret ID that contains the Kafka username "
+ "to use with SASL_PLAIN authentication.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
@Default.String("")
String getKafkaReadUsernameSecretId();

Expand All @@ -120,10 +128,10 @@ final class Offset {
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = KafkaAuthenticationMethod.SASL_PLAIN,
optional = true,
description = "Password",
description = "Secret Version ID For Kafka SASL/PLAIN Password",
helpText =
"Secret Manager secret ID for the SASL_PLAIN password. Should be in the format projects/{project}/secrets/{secret}/versions/{secret_version}",
example = "projects/your-project-id/secrets/your-secret/versions/your-secret-version")
"The Google Cloud Secret Manager secret ID that contains the Kafka password to use with SASL_PLAIN authentication.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
@Default.String("")
String getKafkaReadPasswordSecretId();

Expand All @@ -136,7 +144,8 @@ final class Offset {
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
helpText =
"Cloud storage path for the Keystore location that contains the TLS certificate and private key.",
"The Google Cloud Storage path to the Java KeyStore (JKS) file that contains the "
+ "TLS certificate and private key to use when authenticating with the Kafka cluster.",
description = "Location of Keystore",
example = "gs://your-bucket/keystore.jks")
String getKafkaReadKeystoreLocation();
Expand All @@ -151,8 +160,8 @@ final class Offset {
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
description = "Truststore File Location",
helpText =
"Location of the jks file in Cloud Storage with TLS certificate to verify identity.",
example = "gs://your-bucket/truststore.jks")
"The Google Cloud Storage path to the Java TrustStore (JKS) file that contains"
+ " the trusted certificates to use to verify the identity of the Kafka broker.")
String getKafkaReadTruststoreLocation();

void setKafkaReadTruststoreLocation(String sourceTruststoreLocation);
Expand All @@ -163,11 +172,11 @@ final class Offset {
groupName = "Source",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
helpText =
"Secret Version ID to get password to access secret in truststore for source Kafka.",
description = "Secret Version ID for Truststore Password",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
helpText =
"The Google Cloud Secret Manager secret ID that contains the password to "
+ "use to access the Java TrustStore (JKS) file for Kafka TLS authentication",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadTruststorePasswordSecretId();

void setKafkaReadTruststorePasswordSecretId(String sourceTruststorePasswordSecretId);
Expand All @@ -178,10 +187,11 @@ final class Offset {
groupName = "Source",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
helpText = "Secret Version ID to get password to access secret keystore, for source kafka.",
description = "Secret Version ID of Keystore Password",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
helpText =
"The Google Cloud Secret Manager secret ID that contains the password to"
+ " use to access the Java KeyStore (JKS) file for Kafka TLS authentication.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadKeystorePasswordSecretId();

void setKafkaReadKeystorePasswordSecretId(String sourceKeystorePasswordSecretId);
Expand All @@ -193,10 +203,10 @@ final class Offset {
groupName = "Source",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
helpText =
"Secret Version ID of password to access private key inside the keystore, for source Kafka.",
"The Google Cloud Secret Manager secret ID that contains the password to use to access the private key within the Java KeyStore (JKS) file"
+ " for Kafka TLS authentication.",
description = "Secret Version ID of Private Key Password",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadKeyPasswordSecretId();

void setKafkaReadKeyPasswordSecretId(String sourceKeyPasswordSecretId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public interface KafkaWriteOptions extends PipelineOptions {
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.TLS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE)
},
helpText = "Type of authentication mechanism to use with the destination Kafka.")
helpText =
"The mode of authentication to use with the Kafka cluster. "
+ "Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, and"
+ " TLS for certificate-based authentication.")
@Default.String(KafkaAuthenticationMethod.NONE)
String getKafkaWriteAuthenticationMethod();

Expand All @@ -56,13 +59,12 @@ public interface KafkaWriteOptions extends PipelineOptions {
groupName = "Destination",
parentName = "kafkaWriteAuthenticationMethod",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_PLAIN},
description = "Secret Version ID for Kafka username",
description = "Secret Version ID for Kafka SASL/PLAIN username",
helpText =
"Secret Version ID from the Secret Manager to get Kafka"
+ KafkaAuthenticationMethod.SASL_PLAIN
+ " username for the destination Kafka.",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
"The Google Cloud Secret Manager secret ID that contains the Kafka username "
+ " for SASL_PLAIN authentication with the destination Kafka cluster.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
@Default.String("")
String getKafkaWriteUsernameSecretId();

void setKafkaWriteUsernameSecretId(String destinationUsernameSecretId);
Expand All @@ -73,70 +75,71 @@ public interface KafkaWriteOptions extends PipelineOptions {
optional = true,
parentName = "kafkaWriteAuthenticationMethod",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_PLAIN},
description = "Secret Version ID for Kafka SASL/PLAIN password",
helpText =
"Secret Version ID from the Secret Manager to get Kafka "
+ KafkaAuthenticationMethod.SASL_PLAIN
+ " password for the destination Kafka.",
description = "Secret Version ID of for Kafka password",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
"The Google Cloud Secret Manager secret ID that contains the Kafka password to use for SASL_PLAIN authentication"
+ " with the destination Kafka cluster.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
@Default.String("")
String getKafkaWritePasswordSecretId();

void setKafkaWritePasswordSecretId(String destinationPasswordSecretId);

@TemplateParameter.GcsReadFile(
order = 7,
optional = true,
groupName = "Destination",
parentName = "kafkaWriteAuthenticationMethod",
description = "Truststore File Location",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
description = "Location of Keystore",
helpText =
"Location of the jks file in Cloud Storage with TLS certificate to verify identity.")
String getKafkaWriteTruststoreLocation();
"The Google Cloud Storage path to the Java KeyStore (JKS) file that contains the TLS certificate "
+ "and private key for authenticating with the destination Kafka cluster.",
example = "gs://<BUCKET>/<KEYSTORE>.jks")
String getKafkaWriteKeystoreLocation();

void setKafkaWriteTruststoreLocation(String destinationTruststoreLocation);
void setKafkaWriteKeystoreLocation(String destinationKeystoreLocation);

@TemplateParameter.Text(
void setKafkaWritePasswordSecretId(String destinationPasswordSecretId);

@TemplateParameter.GcsReadFile(
order = 8,
optional = true,
groupName = "Destination",
parentName = "kafkaWriteAuthenticationMethod",
description = "Truststore File Location",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
helpText =
"Secret Version ID to get password to access secret in truststore, for destination kafka.",
description = "Secret Version ID of Truststore password",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
String getKafkaWriteTruststorePasswordSecretId();
"The Google Cloud Storage path to the Java TrustStore (JKS) file that contains the trusted certificates"
+ " to use to verify the identity of the destination Kafka broker.")
String getKafkaWriteTruststoreLocation();

void setKafkaWriteTruststorePasswordSecretId(String destinationTruststorePasswordSecretId);
void setKafkaWriteTruststoreLocation(String destinationTruststoreLocation);

@TemplateParameter.GcsReadFile(
order = 10,
@TemplateParameter.Text(
order = 9,
optional = true,
helpText =
"Cloud storage path for the Keystore location that contains the TLS certificate and private key.",
groupName = "Destination",
parentName = "kafkaWriteAuthenticationMethod",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
description = "Location of Keystore",
example = "gs://your-bucket/keystore.jks")
String getKafkaWriteKeystoreLocation();
description = "Secret Version ID of Truststore password",
helpText =
"The Google Cloud Secret Manager secret ID that contains the password to use to access the Java TrustStore (JKS) file "
+ "for TLS authentication with the destination Kafka cluster.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaWriteTruststorePasswordSecretId();

void setKafkaWriteKeystoreLocation(String destinationKeystoreLocation);
void setKafkaWriteTruststorePasswordSecretId(String destinationTruststorePasswordSecretId);

@TemplateParameter.Text(
order = 11,
optional = true,
groupName = "Destination",
parentName = "kafkaWriteAuthenticationMethod",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
description = "Secret Version ID of Keystore Password",
helpText =
"Secret Version ID to get password to access secret keystore, for destination kafka.",
description = "Secret Version Version ID of Keystore Password",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
"The Google Cloud Secret Manager secret ID that contains the password to access the Java KeyStore (JKS) "
+ "file to use for TLS authentication with the destination Kafka cluster.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaWriteKeystorePasswordSecretId();

void setKafkaWriteKeystorePasswordSecretId(String destinationKeystorePasswordSecretId);
Expand All @@ -147,11 +150,11 @@ public interface KafkaWriteOptions extends PipelineOptions {
parentName = "kafkaWriteAuthenticationMethod",
groupName = "Destination",
parentTriggerValues = {KafkaAuthenticationMethod.TLS},
description = "Secret Version ID of Private Key Password",
helpText =
"Secret Version ID of password to access private key inside the keystore, for destination Kafka.",
description = "Secret Version ID of key",
example =
"projects/your-project-number/secrets/your-secret-name/versions/your-secret-version")
"The Google Cloud Secret Manager secret ID that contains the password to use to access the private key within the "
+ "Java KeyStore (JKS) file for TLS authentication with the destination Kafka cluster.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaWriteKeyPasswordSecretId();

void setKafkaWriteKeyPasswordSecretId(String destinationKeyPasswordSecretId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public interface SchemaRegistryOptions extends PipelineOptions {
KafkaTemplateParamters.MessageFormatConstants.AVRO_BINARY_ENCODING),
@TemplateParameter.TemplateEnumOption(KafkaTemplateParamters.MessageFormatConstants.JSON)
},
description = "Message Format",
description = "Kafka Message Format",
helpText =
"The Kafka message format. Can be AVRO_CONFLUENT_WIRE_FORMAT, AVRO_BINARY_ENCODING or JSON.")
"The format of the Kafka messages to read. The supported values are AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry encoded Avro), AVRO_BINARY_ENCODING (Plain binary Avro), and JSON.")
@Default.String(KafkaTemplateParamters.MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
String getMessageFormat();

Expand Down Expand Up @@ -70,7 +70,9 @@ public interface SchemaRegistryOptions extends PipelineOptions {
parentTriggerValues = {KafkaTemplateParamters.SchemaFormat.SINGLE_SCHEMA_FILE},
description = "Cloud Storage path to the Avro schema file",
optional = true,
helpText = "Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.")
helpText =
"The Google Cloud Storage path to the single Avro schema file used to "
+ "decode all of the messages in a topic.")
@Default.String("")
String getConfluentAvroSchemaPath();

Expand All @@ -83,7 +85,9 @@ public interface SchemaRegistryOptions extends PipelineOptions {
parentTriggerValues = {KafkaTemplateParamters.SchemaFormat.SCHEMA_REGISTRY},
description = "Schema Registry Connection URL",
optional = true,
helpText = "Schema Registry Connection URL for a registry.")
helpText =
"The URL for the Confluent Schema Registry instance used to manage Avro schemas"
+ " for message decoding.")
@Default.String("")
String getSchemaRegistryConnectionUrl();

Expand All @@ -96,7 +100,8 @@ public interface SchemaRegistryOptions extends PipelineOptions {
parentTriggerValues = {KafkaTemplateParamters.MessageFormatConstants.AVRO_BINARY_ENCODING},
description = "Cloud Storage path to the Avro schema file",
optional = true,
helpText = "Cloud Storage path to Avro schema file. For example, gs://MyBucket/file.avsc.")
helpText =
"The Google Cloud Storage path to the Avro schema file used to decode binary-encoded Avro messages.")
@Default.String("")
String getBinaryAvroSchemaPath();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ public interface KafkaToBigQueryFlexOptions
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE),
},
description = "Authentication Mode",
helpText = "Kafka read authentication mode. Can be NONE or SASL_PLAIN")
description = "Kafka Read Authentication Mode",
helpText =
"The mode of authentication to use with the Kafka cluster. "
+ "Use NONE for no authentication"
+ " or SASL_PLAIN for SASL/PLAIN username and password. "
+ " Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.")
@Default.String("NONE")
String getKafkaReadAuthenticationMode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ public interface KafkaToGcsOptions
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE),
},
description = "Authentication Mode",
helpText = "Kafka read authentication mode. Can be NONE or SASL_PLAIN")
description = "Kafka Read Authentication Mode",
helpText =
"The mode of authentication to use with the Kafka cluster. "
+ "Use NONE for no authentication and "
+ "SASL_PLAIN for SASL/PLAIN username and password. "
+ " Apache Kafka for BigQuery only supports the SASL_PLAIN authentication mode.")
@Default.String("NONE")
String getKafkaReadAuthenticationMode();

Expand Down

0 comments on commit 2ce397f

Please sign in to comment.