Kafka Connect and Kafka Streams pipeline examples have moved to quickstart-demos. Please check there for updated examples.
- Demonstrate various ways, with and without Kafka Connect, to get data into Kafka topics and then loaded into the Kafka Streams API
KStream
- Show some basic usage of the stream processing API
Uses command line kafka-console-producer
to produce String
keys and String
values to a Kafka topic.
Client application reads from the Kafka topic using Serdes.String()
for both key and value.
KAFKA-2526: one cannot use the --key-serializer
argument in the kafka-console-producer
to serialize the key as a Long
. As a result, in this example the key is serialized as a String
. As a workaround, you could write your own kafka.common.MessageReader (e.g. check out the default implementation of LineMessageReader) and then you can specify --line-reader
argument in the kafka-console-producer
.
Uses Kafka Connect JDBC source connector to produce JSON values, and inserts the key using single message transformations, also known as SMTs
. This is helpful because by default JDBC source connector does not insert a key.
Client application reads from the Kafka topic using Serdes.String()
for key and a custom JSON Serde for the value.
KAFKA-4714: with this enhancement, the simple message transform will be able to cast the type of the key as Long
. Until this is implemented, in this example the key is written as a String
.
Uses Kafka Connect JDBC source connector to produce Avro values, and null String
keys, to a Kafka topic.
Client application reads from the Kafka topic using GenericAvroSerde
for the value and then the map
function to convert the stream of messages to have Long
keys and custom class values.
This example currently uses GenericAvroSerde
and not SpecificAvroSerde
for a specific reason. JDBC source connector currently doesn't set a namespace when it generates a schema name for the data it is producing to Kafka. For SpecificAvroSerde
, the lack of namespace is a problem when trying to match reader and writer schema because Avro uses the writer schema name and namespace to create a classname and tries to load this class, but without a namespace, the class will not be found. A workaround will be available when KAFKA-5164 is fixed, using simple message transformation SetSchemaMetadata
to add the namespace in the connector.
Uses Java client to produce Long
keys and SpecificAvro
values to a Kafka topic.
Client application reads from the Kafka topic using Serdes.Long()
for key and SpecificAvroSerde
for the value.
All examples in this repo demonstrate the Kafka Streams API methods count
and reduce
.
KAFKA-5245: one needs to provide the Serdes twice, (1) when calling KStreamBuilder#stream()
and (2) when calling KStream#groupByKey()
PR-531: Confluent distribution provides packages for GenericAvroSerde
and SpecificAvroSerde
KAFKA-2378: adds APIs to be able to embed Kafka Connect into client applications
-
You have a Kafka cluster running at least one Kafka broker, Zookeeper, and Confluent Schema Registry. We assume that these services are started with the default settings. If you need help setting up the cluster, please consult the basic Confluent Platform Quickstart or Confluent Platform on Docker Quickstart.
-
By default the
timeout
command is available on most Linux distributions but not Mac OS. Thistimeout
command is used by the bash scripts to terminate consumer processes after a period of time. To install it on a Mac:
# Install coreutils
brew install coreutils
# Add a "gnubin" directory to your PATH
PATH="/usr/local/opt/coreutils/libexec/gnubin:$PATH"
- NOTE: you may need to edit the file paths referenced in the bash scripts in the
scripts/
directory, particularly if you installed Confluent Platform via ZIP or TAR or are on Mac.
Compile from the pom.xml file. This also generate sources to create the avro.model.Location
Java code
mvn clean package
There are multiple examples of data flow as described above. Each example has a corresponding bash script in the scripts/
directory, and the Java package name is built into the bash script name. The bash script does several things:
- Copies
files/table.locations
to/usr/local/lib/table.locations
- If the example uses the JDBC connector, creates a table called
locations
in a local sqlite database. - If the example uses the JDBC connector, starts Kafka Connect in standalone mode on the local machine
- Produces data to a Kafka topic (method differs example to example)
- Runs
StreamsIngest
class unique to each example to read data from the Kafka topic and do some simple streams processing
# Example 1: Kafka console producer -> `String`
./scripts/run-consoleproducer.sh
# Example 2: JDBC source connector with Single Message Transformations -> JSON
./scripts/run-jdbcjson.sh
# Example 3: JDBC source connector with `GenericAvro` -> `GenericRecord`
./scripts/run-jdbcgenericavro.sh
# Example 4: Java client producer with `SpecificAvro` -> custom class
./scripts/run-javaproducer.sh
Assumptions:
- Broker is
localhost:9092
- Confluent Schema Registry is
http://localhost:8081
- table.locations is copied to
/usr/local/lib/table.locations
- sqlite3 database is created at
/usr/local/lib/sqlite3
To override any of the above values, you will need to make appropriate changes in the following places:
scripts/
: bash scripts (file path, sqlite3 database path, ZooKeeper address, Java arguments)files/
: properties files for Kafka Connect
Dataset is files/table.locations
1|Raleigh|300
2|Dusseldorf|100
1|Raleigh|600
3|Moscow|800
4|Sydney|200
2|Dusseldorf|400
5|Chennai|400
3|Moscow|100
3|Moscow|200
1|Raleigh|700