From ccafe0bd32dd4d8c16461c0122d5b384a6dc4a54 Mon Sep 17 00:00:00 2001 From: Dippatel98 Date: Fri, 7 Jun 2024 18:34:51 +0000 Subject: [PATCH] Fix bug in Kafka To BigQuery Template --- .../teleport/v2/templates/KafkaToBigQueryFlex.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java index 0024f10cae..89541a114d 100644 --- a/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java +++ b/v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java @@ -43,6 +43,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; @@ -366,6 +368,16 @@ public static PipelineResult runJsonPipeline( String bootstrapServers, Map kafkaConfig) { + // Register the coder for pipeline + FailsafeElementCoder, String> coder = + FailsafeElementCoder.of( + KvCoder.of( + NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())), + NullableCoder.of(StringUtf8Coder.of())); + + CoderRegistry coderRegistry = pipeline.getCoderRegistry(); + coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder); + PCollectionTuple convertedTableRows; convertedTableRows = pipeline