- This plugin provides access to major core functionalities of the
com.rabbitmq:amqp-client
library.
- Integrated with coroutines and has a separate dispatcher.
- Includes a built-in connection/channel management system.
- Gives the possibility to interact directly with the java library.
- Seamlessly integrates with the Kotlin DSL, making it readable, maintainable, and easy to use.
- Installation
- Queue Binding Example
- Producer Example
- Consumer Example
- Advanced Consumer Example
- Library Calls Example
- Dead Letter Queue Example
- Logging
install(RabbitMQ) {
uri = "amqp://<user>:<password>@<address>:<port>"
defaultConnectionName = "<default_connection>"
connectionAttempts = 20
attemptDelay = 10
dispatcherThreadPollSize = 4
tlsEnabled = false
}
rabbitmq {
queueBind {
queue = "demo-queue"
exchange = "demo-exchange"
routingKey = "demo-routing-key"
queueDeclare {
queue = "demo-queue"
durable = true
}
exchangeDeclare {
exchange = "demo-exchange"
type = "direct"
}
}
}
rabbitmq {
repeat(10) {
basicPublish {
exchange = "demo-exchange"
routingKey = "demo-routing-key"
message { "Hello World!" }
}
}
}
rabbitmq {
basicConsume {
autoAck = true
queue = "demo-queue"
deliverCallback<String> { tag, message ->
logger.info("Received message: $message")
}
}
}
rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { tag, message ->
logger.info("Received message: $message")
delay(30)
}
}
}
}
rabbitmq {
libChannel(id = 2) {
basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
}
}
basicConsume("demo-queue", true, consumer)
}
}
rabbitmq {
libConnection(id = "lib-connection") {
val channel = createChannel()
channel.basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
}
}
channel.basicConsume("demo-queue", true, consumer)
}
}
@Serializable
data class Message(
var content: String
)
fun Application.module() {
install(RabbitMQ) {
uri = "amqp://guest:guest@localhost:5672"
dispatcherThreadPollSize = 3
}
rabbitmq {
queueBind {
queue = "dlq"
exchange = "dlx"
routingKey = "dlq-dlx"
queueDeclare {
queue = "dlq"
durable = true
}
exchangeDeclare {
exchange = "dlx"
type = "direct"
}
}
queueBind {
queue = "test-queue"
exchange = "test-exchange"
queueDeclare {
queue = "test-queue"
arguments = mapOf(
"x-dead-letter-exchange" to "dlx",
"x-dead-letter-routing-key" to "dlq-dlx"
)
}
exchangeDeclare {
exchange = "test-exchange"
type = "fanout"
}
}
}
rabbitmq {
repeat(100) {
basicPublish {
exchange = "test-exchange"
message {
Message(content = "Hello world!")
}
}
}
}
rabbitmq {
basicConsume {
queue = "test-queue"
autoAck = false
deliverCallback<Message> { tag, message ->
basicReject {
deliveryTag = tag
requeue = false
}
}
}
basicConsume {
queue = "dlq"
autoAck = true
deliverCallback<Message> { tag, message ->
println("Received message in dead letter queue: $message")
}
}
}
}
- In order to set a logging level to this library add this line in
logback.xml
file:
<logger name="io.github.damir.denis.tudor.ktor.server.rabbitmq" level="DEBUG"/>