Skip to content

Commit

Permalink
Merge pull request #1202 from fozzie15:kafka_avro_changes
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 589240855
  • Loading branch information
cloud-teleport committed Dec 8, 2023
2 parents d0db184 + 4f529f5 commit 66bc1ad
Show file tree
Hide file tree
Showing 6 changed files with 667 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,42 @@
package com.google.cloud.teleport.v2.kafka.transforms;

import com.google.cloud.teleport.v2.kafka.utils.SslConsumerFactoryFn;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

/** Different transformations over the processed data in the pipeline. */
public class KafkaTransform {

/**
* Configures Kafka consumer.
* Configures Kafka consumer that reads String.
*
* @param bootstrapServers Kafka servers to read from
* @param topicsList Kafka topics to read from
* @param config configuration for the Kafka consumer
* @return PCollection of Kafka Key & Value Pair deserialized in string format
*/
public static PTransform<PBegin, PCollection<KV<String, String>>> readFromKafka(
public static PTransform<PBegin, PCollection<KV<String, String>>> readStringFromKafka(
String bootstrapServers,
List<String> topicsList,
Map<String, Object> config,
Expand All @@ -61,6 +71,34 @@ public static PTransform<PBegin, PCollection<KV<String, String>>> readFromKafka(
return kafkaRecords.withoutMetadata();
}

/**
* Configures Kafka consumer that reads Avro GenericRecord.
*
* @param bootstrapServers Kafka servers to read from
* @param topicsList Kafka topics to read from
* @param config configuration for the Kafka consumer
* @return PCollection of Kafka Key & Value Pair deserialized in string format
*/
public static PTransform<PBegin, PCollection<KV<byte[], GenericRecord>>> readAvroFromKafka(
String bootstrapServers,
List<String> topicsList,
Map<String, Object> config,
String avroSchema,
@Nullable Map<String, String> sslConfig) {
KafkaIO.Read<byte[], GenericRecord> kafkaRecords =
KafkaIO.<byte[], GenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
ByteArrayDeserializer.class, NullableCoder.of(ByteArrayCoder.of()))
.withValueDeserializer(new KafkaSchemaDeserializerProvider(avroSchema))
.withConsumerConfigUpdates(config);
if (sslConfig != null) {
kafkaRecords = kafkaRecords.withConsumerFactoryFn(new SslConsumerFactoryFn(sslConfig));
}
return kafkaRecords.withoutMetadata();
}

/**
* The {@link MessageToFailsafeElementFn} wraps an Kafka Message with the {@link FailsafeElement}
* class so errors can be recovered from and the original message can be output to a error records
Expand All @@ -75,4 +113,31 @@ public void processElement(ProcessContext context) {
context.output(FailsafeElement.of(message, message.getValue()));
}
}

static class KafkaSchemaDeserializerProvider implements DeserializerProvider<GenericRecord> {

private String avroSchemaPath;
private transient Schema avroSchema;

public KafkaSchemaDeserializerProvider(String avroSchemaPath) {
this.avroSchemaPath = avroSchemaPath;
}

@Override
public Deserializer<GenericRecord> getDeserializer(Map<String, ?> configs, boolean isKey) {
return new SchemaKafkaAvroDeserializer(getAvroSchema(), configs);
}

@Override
public Coder<GenericRecord> getCoder(CoderRegistry coderRegistry) {
return NullableCoder.of(AvroCoder.of(getAvroSchema()));
}

protected synchronized Schema getAvroSchema() {
if (this.avroSchema == null) {
this.avroSchema = SchemaUtils.getAvroSchema(avroSchemaPath);
}
return this.avroSchema;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.kafka.transforms;

import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Deserializer;

public class SchemaKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
implements Deserializer<GenericRecord> {
private boolean isKey;
private Schema schema;

public SchemaKafkaAvroDeserializer() {}

public SchemaKafkaAvroDeserializer(Schema schema) {
this.schema = schema;
}

public SchemaKafkaAvroDeserializer(Schema schema, Map<String, ?> props) {
// Here we use a mock schema since we always want to return the same schema instance.
this.schemaRegistry = new MockSchemaRegistryClient();
try {
this.schemaRegistry.register("subject", schema);
} catch (Exception e) {
throw new RuntimeException("Error registering schema", e);
}
}

public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
this.configure(new KafkaAvroDeserializerConfig(configs));
}

public GenericRecord deserialize(String s, byte[] bytes) {
return (GenericRecord) this.deserialize(bytes);
}

public GenericRecord deserialize(String s, byte[] bytes, Schema readerSchema) {
return (GenericRecord) this.deserialize(bytes, readerSchema);
}

public void close() {}
}
Loading

0 comments on commit 66bc1ad

Please sign in to comment.