This asset aims to cover the following topics:
- Setup of Kafka
- Show MuleSoft to Kafka integration through a sample mule application using the out-of-the-box Apache Kafka Connector freely available on Anypoint Exchange
The following steps are required to have a running Kafka instance on-prem used to store our events
https://kafka.apache.org/downloads
$ tar -xzf kafka_2.13-2.6.0.tgz
$ cd kafka_2.13-2.6.0
NOTE: Your local environment must have OpenJDK 8 or 11 installed.
Run the following commands in order to start all services in the correct order:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Open another terminal session and run:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Open a new terminal
$ bin/kafka-topics.sh --create --topic <YOUR TOPIC NAME> --bootstrap-server localhost:9092
The application implements the Producer-Consumer pattern
. It's composed by two main flow:
- The
Producer
flow is triggered by a POST invocation on thelocalhost:8081/api/events
endpoint, containing in the body the message to store. i.e.
{
"message" : "Message pushed into the queue"
}
- The
Consumer
flow is triggered automatically by theMessage Listener
operation of the Apache Kafka Connector, and is used to retrieve the message from the queue and log it into the console.
NOTE: Be sure to have both the Kafka services running
- Clone the repository and import the project into Anypoint Studio
- Modify the field
kafka.topic
of thetest-properties.yaml
file (located insrc/main/resources
) in order to refernce<YOUR TOPIC NAME>
- Run the application
- Push a message into the queue using a REST client (i.e. Postman, Advanced REST Client) by sending a POST containing the data to store to the
localhost:8081/api/events
endpoint, i.e.
{
"message" : "Message pushed into the queue"
}
- In Anypoint Studio log console you should find the event, retrieved from the queue, logged in the console i.e.
INFO 2020-11-30 15:00:00,000 [[MuleRuntime].uber.04: [mule-kafka-integration].consumerFlow.CPU_LITE @43b544] [processor: consumerFlow/processors/0; event: 9b716bc0-331a-11eb-aaba-3c22fb13cfba] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {
"message" : "Message pushed into the queue"
}