Skip to content

Commit

Permalink
Merge pull request #1643 from Dippatel98:gcs-test
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 641348878
  • Loading branch information
cloud-teleport committed Jun 7, 2024
2 parents 43219df + ccafe0b commit 8674def
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
Expand Down Expand Up @@ -366,6 +368,16 @@ public static PipelineResult runJsonPipeline(
String bootstrapServers,
Map<String, Object> kafkaConfig) {

// Register the coder for pipeline
FailsafeElementCoder<KV<String, String>, String> coder =
FailsafeElementCoder.of(
KvCoder.of(
NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())),
NullableCoder.of(StringUtf8Coder.of()));

CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

PCollectionTuple convertedTableRows;
convertedTableRows =
pipeline
Expand Down

0 comments on commit 8674def

Please sign in to comment.