From 0f8a79308434a5a923488f120c17ecb3d79596f3 Mon Sep 17 00:00:00 2001 From: Jayant Jain Date: Thu, 18 Jul 2024 18:04:47 +0530 Subject: [PATCH] Add null check for execution env in sink --- .../flink/bigquery/sink/BigQuerySink.java | 4 ++++ .../flink/bigquery/sink/BigQuerySinkTest.java | 19 +++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySink.java index 19689a42..00d7f1a2 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySink.java @@ -41,6 +41,10 @@ public class BigQuerySink { private static final Logger LOG = LoggerFactory.getLogger(BigQuerySink.class); public static Sink get(BigQuerySinkConfig sinkConfig, StreamExecutionEnvironment env) { + if (env == null) { + LOG.error("BigQuerySink requires a valid StreamExecutionEnvironment. Found null."); + throw new IllegalArgumentException("StreamExecutionEnvironment not provided"); + } if (sinkConfig.getDeliveryGuarantee() == DeliveryGuarantee.AT_LEAST_ONCE) { return new BigQueryDefaultSink(sinkConfig); } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java index a38c48de..916997cb 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java @@ -17,6 +17,7 @@ package com.google.cloud.flink.bigquery.sink; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.google.cloud.flink.bigquery.fakes.StorageClientFaker; import com.google.cloud.flink.bigquery.sink.serializer.FakeBigQuerySerializer; @@ -31,6 +32,8 @@ /** Tests for {@link BigQuerySink}. */ public class BigQuerySinkTest { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + @Test public void testGetWithAtLeastOnce() throws IOException { BigQuerySinkConfig sinkConfig = @@ -40,6 +43,18 @@ public void testGetWithAtLeastOnce() throws IOException { .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .deliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); + assertNotNull(BigQuerySink.get(sinkConfig, env)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetWithNullStreamExecutionEnv() throws IOException { + BigQuerySinkConfig sinkConfig = + BigQuerySinkConfig.newBuilder() + .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) + .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) + .deliveryGuarantee(DeliveryGuarantee.NONE) + .build(); assertNotNull(BigQuerySink.get(sinkConfig, null)); } @@ -52,7 +67,7 @@ public void testGetWithNoneDeliveryGuarantee() throws IOException { .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .deliveryGuarantee(DeliveryGuarantee.NONE) .build(); - assertNotNull(BigQuerySink.get(sinkConfig, null)); + assertNotNull(BigQuerySink.get(sinkConfig, env)); } @Test(expected = UnsupportedOperationException.class) @@ -64,6 +79,6 @@ public void testExactlyOnceNotSupported() throws IOException { .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .deliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .build(); - assertNotNull(BigQuerySink.get(sinkConfig, null)); + assertNotNull(BigQuerySink.get(sinkConfig, env)); } }