Skip to content

Commit

Permalink
Fix KafkaToGcsIT (#1618)
Browse files Browse the repository at this point in the history
* Fix test

* Mark old kafka to bq template hidden

* Spotless

* Change if condition to check for schema path or schema registry

* Add exception

* Revert "Add exception"

This reverts commit a198f49.

* Dummy change
  • Loading branch information
AnandInguva authored May 30, 2024
1 parent 2ce397f commit 4319ce6
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@
},
streaming = true,
supportsAtLeastOnce = true,
supportsExactlyOnce = true)
supportsExactlyOnce = true,
hidden = true)
public class KafkaToBigQuery {

/* Logger for class. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@
"keystoreLocation",
"sourceSSL",
"sourceTruststorePassword",
"sourceKey",
"sourceKeystorePassword",
"sourceKey"
})
public class KafkaToBigQueryFlex {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ public WriteFilesResult<AvroDestination> expand(
KafkaTemplateParamters.MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) {
String schemaRegistryURL = schemaRegistryURL();
String schemaPath = schemaPath();
if (schemaRegistryURL == null && schemaPath == null) {
assert schemaRegistryURL != null;
assert schemaPath != null;
if (schemaRegistryURL.isBlank() && schemaPath.isBlank()) {
throw new UnsupportedOperationException(
"A Schema Registry URL or static schemas are required for CONFLUENT_WIRE_FORMAT messages");
"A Schema Registry URL or static schema is required for CONFLUENT_WIRE_FORMAT messages");
}
DoFn<KafkaRecord<byte[], byte[]>, GenericRecord> convertToBytes;
if (schemaRegistryURL != null) {
if (!schemaRegistryURL.isBlank()) {
convertToBytes = new ConvertBytesToGenericRecord(schemaRegistryURL);
} else {
convertToBytes = new ConvertBytesToGenericRecord(schemaPath, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private void baseKafkaToGcs(Function<LaunchConfig.Builder, LaunchConfig.Builder>
+ ";"
+ topicName)
.addParameter("windowDuration", "10s")
.addParameter("schemaPath", getGcsPath("avro_schema.avsc"))
.addParameter("schemaFormat", "SINGLE_SCHEMA_FILE")
.addParameter("confluentAvroSchemaPath", getGcsPath("avro_schema.avsc"))
.addParameter("kafkaReadOffset", "earliest")
.addParameter("outputDirectory", getGcsPath(testName))
.addParameter("outputFilenamePrefix", testName + "-")
Expand Down

0 comments on commit 4319ce6

Please sign in to comment.