Skip to content

DamirDenis-Tudor/ktor-server-rabbitmq

Repository files navigation

Deployment Status Pull Request Checks

Overview

  • This plugin provides access to major core functionalities of the com.rabbitmq:amqp-client library.

Features

  • Integrated with coroutines and has a separate dispatcher.
  • Includes a built-in connection/channel management system.
  • Gives the possibility to interact directly with the java library.
  • Seamlessly integrates with the Kotlin DSL, making it readable, maintainable, and easy to use.

Table of Contents

  1. Installation
  2. Queue Binding Example
  3. Producer Example
  4. Consumer Example
  5. Advanced Consumer Example
  6. Library Calls Example
  7. Dead Letter Queue Example
  8. Logging

Usage

Installation

install(RabbitMQ) {
    uri = "amqp://<user>:<password>@<address>:<port>"
    defaultConnectionName = "<default_connection>"
    connectionAttempts = 20
    attemptDelay = 10
    dispatcherThreadPollSize = 4
    tlsEnabled = false
}

Queue binding example

rabbitmq {
    queueBind {
        queue = "demo-queue"
        exchange = "demo-exchange"
        routingKey = "demo-routing-key"
        queueDeclare {
            queue = "demo-queue"
            durable = true
        }
        exchangeDeclare {
            exchange = "demo-exchange"
            type = "direct"
        }
    }
}

Producer example

rabbitmq {
    repeat(10) {
        basicPublish {
            exchange = "demo-exchange"
            routingKey = "demo-routing-key"
            message { "Hello World!" }
        }
    }
}

Consumer Example

rabbitmq {
    basicConsume {
        autoAck = true
        queue = "demo-queue"
        deliverCallback<String> { tag, message ->
            logger.info("Received message: $message")
        }
    }
}

Consumer Example with coroutinePollSize

rabbitmq {
    connection(id = "consume") {
        basicConsume {
            autoAck = true
            queue = "demo-queue"
            dispacher = Dispacher.IO
            coroutinePollSize = 1_000
            deliverCallback<String> { tag, message ->
                logger.info("Received message: $message")
                delay(30)
            }
        }
    }
}

Library Calls Example

rabbitmq {
    libChannel(id = 2) {
        basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())

        val consumer = object : DefaultConsumer(channel) {
            override fun handleDelivery(
                consumerTag: String?,
                envelope: Envelope?,
                properties: AMQP.BasicProperties?,
                body: ByteArray?
            ) {

            }
        }

        basicConsume("demo-queue", true, consumer)
    }
}
rabbitmq {
    libConnection(id = "lib-connection") {
        val channel = createChannel()

        channel.basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())

        val consumer = object : DefaultConsumer(channel) {
            override fun handleDelivery(
                consumerTag: String?,
                envelope: Envelope?,
                properties: AMQP.BasicProperties?,
                body: ByteArray?
            ) {

            }
        }

        channel.basicConsume("demo-queue", true, consumer)
    }
}

Dead Letter Queue Example

@Serializable
data class Message(
    var content: String
)

fun Application.module() {
    install(RabbitMQ) {
        uri = "amqp://guest:guest@localhost:5672"
        dispatcherThreadPollSize = 3
    }

    rabbitmq {
        queueBind {
            queue = "dlq"
            exchange = "dlx"
            routingKey = "dlq-dlx"
            queueDeclare {
                queue = "dlq"
                durable = true
            }
            exchangeDeclare {
                exchange = "dlx"
                type = "direct"
            }
        }

        queueBind {
            queue = "test-queue"
            exchange = "test-exchange"
            queueDeclare {
                queue = "test-queue"
                arguments = mapOf(
                    "x-dead-letter-exchange" to "dlx",
                    "x-dead-letter-routing-key" to "dlq-dlx"
                )
            }
            exchangeDeclare {
                exchange = "test-exchange"
                type = "fanout"
            }
        }
    }

    rabbitmq {
        repeat(100) {
            basicPublish {
                exchange = "test-exchange"
                message {
                    Message(content = "Hello world!")
                }
            }
        }
    }

    rabbitmq {
        basicConsume {
            queue = "test-queue"
            autoAck = false
            deliverCallback<Message> { tag, message ->
                basicReject {
                    deliveryTag = tag
                    requeue = false
                }
            }
        }

        basicConsume {
            queue = "dlq"
            autoAck = true
            deliverCallback<Message> { tag, message ->
                println("Received message in dead letter queue: $message")
            }
        }
    }
}

Logging

  • In order to set a logging level to this library add this line in logback.xml file:
<logger name="io.github.damir.denis.tudor.ktor.server.rabbitmq" level="DEBUG"/>