Skip to content

Commit

Permalink
throw BigQueryConnector Exception in case of schema mismatch.
Browse files Browse the repository at this point in the history
  • Loading branch information
prashastia committed Sep 30, 2024
1 parent 8549563 commit 7f08506
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;

import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A simple Identity de-serialization for pipelines that just want {@link GenericRecord} as response
Expand All @@ -34,14 +35,24 @@ public class AvroDeserializationSchema
implements BigQueryDeserializationSchema<GenericRecord, GenericRecord> {

private final String avroSchemaString;
private static final Logger LOG = LoggerFactory.getLogger(AvroDeserializationSchema.class);

public AvroDeserializationSchema(String avroSchemaString) {
this.avroSchemaString = avroSchemaString;
}

@Override
public GenericRecord deserialize(GenericRecord record) throws IOException {
return record;
public GenericRecord deserialize(GenericRecord record) throws BigQueryConnectorException {
try {
return record;
} catch (RuntimeException e) {
LOG.error(
String.format(
"Error deserializing Avro Generic Record %s to Avro Generic Record.%nError: %s.%nCause:%s ",
record.toString(), e.getMessage(), e.getCause()));
throw new BigQueryConnectorException(
"Error in deserializing to Avro Generic Record", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,36 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Simple implementation for the Deserialization schema (from Avro GenericRecord to RowData). */
@Internal
public class AvroToRowDataDeserializationSchema
implements BigQueryDeserializationSchema<GenericRecord, RowData> {
private final AvroToRowDataConverters.AvroToRowDataConverter converter;
private final TypeInformation<RowData> typeInfo;
private static final Logger LOG =
LoggerFactory.getLogger(AvroToRowDataDeserializationSchema.class);

public AvroToRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> typeInfo) {
this.converter = AvroToRowDataConverters.createRowConverter(rowType);
this.typeInfo = typeInfo;
}

@Override
public RowData deserialize(GenericRecord record) throws IOException {
return (GenericRowData) converter.convert(record);
public RowData deserialize(GenericRecord record) throws BigQueryConnectorException {
try {
return (GenericRowData) converter.convert(record);
} catch (RuntimeException e) {
LOG.error(
String.format(
"Error deserializing Avro Generic Record %s to Row Data.%nError: %s.%nCause:%s ",
record.toString(), e.getMessage(), e.getCause()));
throw new BigQueryConnectorException("Error in deserializing to Row Data", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;

import java.io.IOException;
import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;

/**
Expand All @@ -34,14 +37,16 @@
public interface BigQueryDeserializationSchema<IN, OUT>
extends Serializable, ResultTypeQueryable<OUT> {

Logger LOG = LoggerFactory.getLogger(BigQueryDeserializationSchema.class);

/**
* De-serializes the IN type record.
*
* @param record The BSON document to de-serialize.
* @return The de-serialized message as an object (null if the message cannot be de-serialized).
* @throws java.io.IOException In case of problems while de-serializing.
* @throws BigQueryConnectorException In case of problems while de-serializing.
*/
OUT deserialize(IN record) throws IOException;
OUT deserialize(IN record) throws BigQueryConnectorException;

/**
* De-serializes the IN type record.
Expand All @@ -54,10 +59,19 @@ public interface BigQueryDeserializationSchema<IN, OUT>
* @param record The IN document to de-serialize.
* @param out The collector to put the resulting messages.
*/
default void deserialize(IN record, Collector<OUT> out) throws IOException {
default void deserialize(IN record, Collector<OUT> out) throws BigQueryConnectorException {
OUT deserialize = deserialize(record);
if (deserialize != null) {
out.collect(deserialize);
try {
if (deserialize != null) {
out.collect(deserialize);
}
} catch (RuntimeException e) {
LOG.error(
String.format(
"Failed to forward the deserialized record %s to the next operator.%nError %s%nCause %s",
deserialize, e.getMessage(), e.getCause()));
throw new BigQueryConnectorException(
"Failed to forward the deserialized record to the next operator.");
}
}
}

0 comments on commit 7f08506

Please sign in to comment.