Skip to content

Simple fault-tolerant AMQP client written in Scala and based on Akka and the RabbitMQ java client

Notifications You must be signed in to change notification settings

pongr/amqp-client

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

25 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Simple Scala AMQP client

Simple AMQP client in Scala/Akka based on the RabbitMQ java client.

Overview

This client provides a simple API for

  • publishing and consuming messages over AMQP
  • setting up RPC clients and servers
  • automatic reconnection

It is based on the Akka 2.0 framework.

RPC patterns

Typical RPC with AMQP follows this pattern:

  1. client sets up a private, exclusive response queue
  2. client sends message and set their 'replyTo' property to the name of this response queue
  3. server processes the message and replies to its 'replyTo' queue by publishing the response to the default exchange using the queue name as routing key (all queues are bound to their name on the default exchange)

Usually, you would set up several RPC servers which all use the same shared queue. The broker will load-balance messsages between these consumers using round-robin distribution, which can be combined with 'prefetch' channel settings. Setting 'prefetch' to 1 is very useful if you need resource-based (CPU, ...) load-balancing.

But you can also extend this pattern by setting up RPC servers which all use private exclusive queues bound to the same key. In this case, each server will receive the same request and will send back a response. This is very useful if you want to break a single operation into multiple, parallel steps. For example, if you want to decrypt things, you could divide the key space into N parts, set up one RPC server for each part, publish a single RPC request and wait for N responses.

This could be further extended with a simple 'workflow' pattern where each server publishes its results to the shared queue used by the next step. For example, if you want to chain steps A, B and C, set up a shared queue for each step, have 'A' processors publish to queue 'B', 'B' processors publish to queue 'C' ....

Samples

Please check ChannelOwnerSpec.scala in src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala for more comprehensive samples

/**
 * basic consumer/producer test
 */
def testConsumer() {
  val system = ActorSystem("MySystem")
  val connFactory = new ConnectionFactory()
  connFactory.setHost("localhost")
  // create a "connection owner" actor, which will try and reconnect automatically if the connection ins lost
  val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")
  // use the standard direct exchange
  val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
  // and an exclusive, private queue (name = "" means that the broker will generate a random name)
  val queue = QueueParameters(name = "", passive = false, exclusive = true)

  val foo = system.actorOf(Props(new Actor {
    def receive = {
      case Delivery(tag, envelope, properties, body) => {
        println("got a message")
        sender ! Ack(envelope.getDeliveryTag)
      }
    }
  }))
  // create a consumer that will pass all messages to the foo Actor; the consumer will declare the bindings
  val consumer = ConnectionOwner.createActor(conn, Props(new Consumer(List(Binding(exchange, queue, "my_key", autoack = false)), foo)), 5000 millis)
  val producer = ConnectionOwner.createActor(conn, Props(new ChannelOwner()))
  waitForConnection(system, consumer, producer).await()
  producer ! Publish("amq.direct", "my_key", "yo!".getBytes)
  consumer ! PoisonPill
  producer ! PoisonPill
  system.shutdown()
}

/**
 * basic transaction test
 */
def testTransactions() {
  val system = ActorSystem("MySystem")
  val connFactory = new ConnectionFactory()
  connFactory.setHost("localhost")
  // create a "connection owner" actor, which will try and reconnect automatically if the connection ins lost
  val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")
  val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
  val queue = QueueParameters(name = "queue", passive = false, exclusive = false)
  val foo = system.actorOf(Props(new Actor {
    def receive = {
      case Delivery(tag, envelope, properties, body) => println("got a message")
    }
  }))
  val producer = ConnectionOwner.createActor(conn, Props(new ChannelOwner()))
  val consumer = ConnectionOwner.createActor(conn, Props(new Consumer(Binding(exchange, queue, "my_key", true) :: Nil, foo)))
  waitForConnection(system, producer, consumer).await()
  for(i <- 0 to 10) producer ! Transaction(Publish("amq.direct", "my_key", "yo".getBytes, true, false) :: Nil)
  consumer ! PoisonPill
  producer ! PoisonPill
  foo ! PoisonPill
  system.shutdown()
}

/**
 * RPC sample where each request is picked up by 2 different server and results in 2 responses
 */
def testMultipleResponses() {
  val system = ActorSystem("MySystem")
  val connFactory = new ConnectionFactory()
  connFactory.setHost("localhost")
  // create a "connection owner" actor, which will try and reconnect automatically if the connection ins lost
  val conn = system.actorOf(Props(new ConnectionOwner(connFactory)), name = "conn")

  // basic processor
  val proc = new RpcServer.IProcessor() {
    def process(delivery : Delivery) = {
      println("processing")
      Some(delivery.body)
    }
    def onFailure(delivery : Delivery, e: Exception) = Some(e.toString.getBytes)
  }
  // amq.direct is one of the standard AMQP exchanges
  val exchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true)
  // this is how you define an exclusive, private response queue. The name is empty
  // which means that the broker will generate a unique, random name when the queue is declared
  val queue = QueueParameters(name = "", passive = false, exclusive = true)
  // create 2 servers, each with its own private queue bound to the same key
  val server1 = ConnectionOwner.createActor(conn, Props(new RpcServer(queue, exchange, "my_key", proc)), 2000 millis)
  val server2 = ConnectionOwner.createActor(conn, Props(new RpcServer(queue, exchange, "my_key", proc)), 2000 millis)
  val client = ConnectionOwner.createActor(conn, Props(new RpcClient()), 2000 millis)
  waitForConnection(system, server1, server2, client).await()
  for (i <-0 to 10) {
    try {
      // send one request and wait for 2 responses
      val future = client.ask(Request(Publish("amq.direct", "my_key", "client1".getBytes) :: Nil, 2))(1000 millis)
      val result = Await.result(future, 1000 millis).asInstanceOf[Response]
      println("result : " + result)
      Thread.sleep(100)
    }
    catch {
      case e: Exception => println(e.toString)
    }
  }
  client ! PoisonPill
  server1 ! PoisonPill
  server2 ! PoisonPill
  system.shutdown()
}

About

Simple fault-tolerant AMQP client written in Scala and based on Akka and the RabbitMQ java client

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Scala 100.0%