KoP has implemented with the Schema Registry to support Kafka clients with Confluent's Schema serializers and deserializers.
To enable the Schema Registry, you should add the following configuration:
kopSchemaRegistryEnable=true
By default, the Schema Registry listens on port 8001, which means you should add the following property to create your Kafka producer or consumer.
schema.registry.url=http://<broker-ip>:8001
Note
For other configurations, such as how to change the port where the Schema Registry listens on, see the configuration guide.
To manage the schemas, see the Confluent's Schema REST API.
This section provides an example about how to use Confluent's Avro serializer on KoP.
Start KoP with the following properties in standalone mode. For details, see Set configuration for KoP.
messagingProtocols=kafka
kafkaListeners=PLAINTEXT://0.0.0.0:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
allowAutoTopicCreationType=partitioned
# Enable the transaction to be compatible with Kafka clients 3.2.0 or later
# See https://kafka.apache.org/documentation/#upgrade_320_notable
kafkaTransactionCoordinatorEnabled=true
brokerDeduplicationEnabled=true
kopSchemaRegistryEnable=true
Create an Avro schema file named User.asvc
under the src/main/avro
directory:
{
"namespace": "example.avro",
"type": "record",
"name": "User1",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
Add the avro-maven-plugin
plugin to generate the Java class from the Avro schema file above.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
<includes>
<include>User.avsc</include>
</includes>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
With the Maven plugin above, a User
class that has a String
field named name
and an int
field named age
will be generated under the example.avro
package.
- Add the following dependencies to use Kafka client 3.3.1 with Confluent's Avro serializer 7.3.1:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.3.1</version>
</dependency>
- Run the following producer application code:
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8001");
final String topic = "my-avro-kafka-topic";
final KafkaProducer<String, User> producer = new KafkaProducer<>(props);
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, new User("alice", 10))).get();
System.out.println("Sent to " + metadata);
producer.close();
You will see the following output:
Sent to my-avro-kafka-topic-0@0
- Run the following consumer application:
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8001");
final KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-avro-kafka-topic"));
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> System.out.println("Received " + record.value() + " from "
+ record.topic() + "-" + record.partition() + "@" + record.offset()));
if (!records.isEmpty()) break;
}
consumer.close();
You will see the following output:
Received {"name": "alice", "age": 10} from my-avro-kafka-topic-0@0
Query the schema whose ID is 1 (i.e. the 1st schema):
$ curl -L http://localhost:8001/schemas/ids/1; echo
{
"schema" : "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
}
Query the subjects:
$ curl -L http://localhost:8001/subjects; echo
[ "my-avro-kafka-topic-value" ]
$ curl -L http://localhost:8001/subjects/my-avro-kafka-topic-value/versions; echo
[ 1 ]
$ curl -L http://localhost:8001/subjects/my-avro-kafka-topic-value/versions/1; echo
{
"id" : 1,
"schema" : "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}",
"subject" : "my-avro-kafka-topic-value",
"version" : 1,
"type" : "AVRO"
}
For a quick start of Confluent's Schema concepts, See Confluent documentation .