diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroDeserializationSchema.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroDeserializationSchema.java index c94b7755..a0c3935e 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroDeserializationSchema.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroDeserializationSchema.java @@ -20,10 +20,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; - -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A simple Identity de-serialization for pipelines that just want {@link GenericRecord} as response @@ -34,14 +35,24 @@ public class AvroDeserializationSchema implements BigQueryDeserializationSchema { private final String avroSchemaString; + private static final Logger LOG = LoggerFactory.getLogger(AvroDeserializationSchema.class); public AvroDeserializationSchema(String avroSchemaString) { this.avroSchemaString = avroSchemaString; } @Override - public GenericRecord deserialize(GenericRecord record) throws IOException { - return record; + public GenericRecord deserialize(GenericRecord record) throws BigQueryConnectorException { + try { + return record; + } catch (RuntimeException e) { + LOG.error( + String.format( + "Error deserializing Avro Generic Record %s to Avro Generic Record.%nError: %s.%nCause:%s ", + record.toString(), e.getMessage(), e.getCause())); + throw new BigQueryConnectorException( + "Error in deserializing to Avro Generic Record", e); + } } @Override diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroToRowDataDeserializationSchema.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroToRowDataDeserializationSchema.java index 3f316e48..f437da5d 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroToRowDataDeserializationSchema.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/reader/deserializer/AvroToRowDataDeserializationSchema.java @@ -22,9 +22,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException; import org.apache.avro.generic.GenericRecord; - -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Simple implementation for the Deserialization schema (from Avro GenericRecord to RowData). */ @Internal @@ -32,6 +33,8 @@ public class AvroToRowDataDeserializationSchema implements BigQueryDeserializationSchema { private final AvroToRowDataConverters.AvroToRowDataConverter converter; private final TypeInformation typeInfo; + private static final Logger LOG = + LoggerFactory.getLogger(AvroToRowDataDeserializationSchema.class); public AvroToRowDataDeserializationSchema(RowType rowType, TypeInformation typeInfo) { this.converter = AvroToRowDataConverters.createRowConverter(rowType); @@ -39,8 +42,16 @@ public AvroToRowDataDeserializationSchema(RowType rowType, TypeInformation extends Serializable, ResultTypeQueryable { + Logger LOG = LoggerFactory.getLogger(BigQueryDeserializationSchema.class); + /** * De-serializes the IN type record. * * @param record The BSON document to de-serialize. * @return The de-serialized message as an object (null if the message cannot be de-serialized). - * @throws java.io.IOException In case of problems while de-serializing. + * @throws BigQueryConnectorException In case of problems while de-serializing. */ - OUT deserialize(IN record) throws IOException; + OUT deserialize(IN record) throws BigQueryConnectorException; /** * De-serializes the IN type record. @@ -54,10 +59,19 @@ public interface BigQueryDeserializationSchema * @param record The IN document to de-serialize. * @param out The collector to put the resulting messages. */ - default void deserialize(IN record, Collector out) throws IOException { + default void deserialize(IN record, Collector out) throws BigQueryConnectorException { OUT deserialize = deserialize(record); - if (deserialize != null) { - out.collect(deserialize); + try { + if (deserialize != null) { + out.collect(deserialize); + } + } catch (RuntimeException e) { + LOG.error( + String.format( + "Failed to forward the deserialized record %s to the next operator.%nError %s%nCause %s", + deserialize, e.getMessage(), e.getCause())); + throw new BigQueryConnectorException( + "Failed to forward the deserialized record to the next operator."); } } }