diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java index 2ae1ddee94..f85543cdbb 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java @@ -123,7 +123,8 @@ }, streaming = true, supportsAtLeastOnce = true, - supportsExactlyOnce = true) + supportsExactlyOnce = true, + hidden = true) public class KafkaToBigQuery { /* Logger for class. */ diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java index 0be5b48bcb..183ff0f7fb 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java @@ -108,8 +108,8 @@ "keystoreLocation", "sourceSSL", "sourceTruststorePassword", + "sourceKey", "sourceKeystorePassword", - "sourceKey" }) public class KafkaToBigQueryFlex { diff --git a/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/AvroWriteTransform.java b/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/AvroWriteTransform.java index 192a12d6a5..16375e1141 100644 --- a/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/AvroWriteTransform.java +++ b/v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/AvroWriteTransform.java @@ -85,12 +85,14 @@ public WriteFilesResult expand( KafkaTemplateParamters.MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) { String schemaRegistryURL = schemaRegistryURL(); String schemaPath = schemaPath(); - if (schemaRegistryURL == null && schemaPath == null) { + assert schemaRegistryURL != null; + assert schemaPath != null; + if (schemaRegistryURL.isBlank() && schemaPath.isBlank()) { throw new UnsupportedOperationException( - "A Schema Registry URL or static schemas are required for CONFLUENT_WIRE_FORMAT messages"); + "A Schema Registry URL or static schema is required for CONFLUENT_WIRE_FORMAT messages"); } DoFn, GenericRecord> convertToBytes; - if (schemaRegistryURL != null) { + if (!schemaRegistryURL.isBlank()) { convertToBytes = new ConvertBytesToGenericRecord(schemaRegistryURL); } else { convertToBytes = new ConvertBytesToGenericRecord(schemaPath, true); diff --git a/v2/kafka-to-gcs/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToGcsIT.java b/v2/kafka-to-gcs/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToGcsIT.java index 10c7ab8590..81ee96a329 100644 --- a/v2/kafka-to-gcs/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToGcsIT.java +++ b/v2/kafka-to-gcs/src/test/java/com/google/cloud/teleport/v2/templates/KafkaToGcsIT.java @@ -111,7 +111,8 @@ private void baseKafkaToGcs(Function + ";" + topicName) .addParameter("windowDuration", "10s") - .addParameter("schemaPath", getGcsPath("avro_schema.avsc")) + .addParameter("schemaFormat", "SINGLE_SCHEMA_FILE") + .addParameter("confluentAvroSchemaPath", getGcsPath("avro_schema.avsc")) .addParameter("kafkaReadOffset", "earliest") .addParameter("outputDirectory", getGcsPath(testName)) .addParameter("outputFilenamePrefix", testName + "-")