Skip to content

MarcPer/cuniculus

Repository files navigation

cuniculus_logo

Ruby job queue backed by RabbitMQ. The word cuniculus comes from the scientific name of the European rabbit (Oryctolagus cuniculus).

ruby

Getting started

gem install cuniculus

The following minimal example assumes RabbitMQ is running on localhost:5672; see the configuration section for how to change this.

Create a worker class:

# -- my_worker.rb
require 'cuniculus/worker'

class MyWorker
  extend Cuniculus::Worker

  # the queue name is not explicitly given, so "cun_default" is used.

  def perform(arg1, arg2)
    puts "Processing:"
    puts "arg1: #{arg1.inspect}"
    puts "arg2: #{arg2.inspect}"
  end
end

Add jobs to queue:

MyWorker.perform_async('x', [1, 2, 3])

Start the job consumer:

cuniculus -r my_worker.rb

Benchmarks

The following measurements were performed with the bin/run_benchmarks utility, with different command parameters. Run it with -h to see its usage.

To simulate network latency, Toxiproxy was used. It needs to be started with toxiproxy-server before running the benchmarks.

Network latency (ms) Prefetch count Throughput (jobs/s) Average latency (ms)
1 65535 (max. allowed) 10225 2
10 65535 (max. allowed) 9990 13
1 50 8051 2
10 50 2500 13
100 50 481 103
1 10 (default) 5266 2
10 10 (default) 807 13
1 1 481 2
10 1 81 13

Additional benchmark parameters:

  • throughput was measured by consuming 100k jobs;
  • job latency was averaged over 200 samples;
  • Ruby 2.7.2 was used.

Several remarks can be made:

  • Higher prefetch counts lead to higher throughput, but there are downsides of having it too high; see this reference on how to properly tune it.
  • Network latency has a severe impact on the throughput, and the effect is larger the smaller the prefetch count is.

Configuration

Configuration is done through code, using Cuniculus.configure.

Example:

require "cuniculus"

# The following Hash is passed as is to Bunny, the library that integrates with RabbitMQ.
rabbitmq_conn = {
    host: 'rabbitmq', # default is 127.0.0.1
    port: 5672,
    ssl: false,
    vhost: '/',
    user: 'guest',
    pass: 'guest',
    auth_mechanism: 'PLAIN',
}

Cuniculus.configure do |cfg|
  cfg.rabbitmq_opts = rabbitmq_conn
  cfg.pub_pool_size = 5         # Only affects job producers
  cfg.dead_queue_ttl = 1000 * 60 * 60 * 24 * 30 # keep failed jobs for 30 days
  cfg.add_queue({ name: "critical", durable: true, max_retry: 10, prefetch_count: 1})
end

To configure the queue used by a worker, use cuniculus_options:

class MyWorker
  extend Cuniculus::Worker

  cuniculus_options queue: "critical"
  def perform
    # code
  end
end

More examples

There is also a more complete example in the Cuniculus repository itself. To run it, clone the repository, then

  • start the Ruby and RabbitMQ containers using Docker Compose:
    docker compose up -d
    
  • from within the cuniculus container, produce a job:
    ruby -Ilib examples/produce.rb
    
  • also from within the container, start the consumer:
    bin/cuniculus -I examples/ -r examples/init_cuniculus.rb
    

The -I examples option adds the examples/ directory into the load path, and -r example/init_cuniculus.rb requires init_cuniculus.rb prior to starting the consumer. The latter is where configurations such as that described in the configuration section section should be.

Error handling

By default, exceptions raised when consuming a job are logged to STDOUT. This can be overriden with the Cuniculus.error_handler method:

Cuniculus.error_handler do |e|
  puts "Oh nein! #{e}"
  LoggingService.send(e)
end

The method expects a block that will receive an exception, and run in the scope of the Worker instance.

Publisher proper shutdown

When perform_async is called, the job is first put into a local (in-memory) queue that is published to RabbitMQ by a worker in a worker pool (the size of which is configured with config.pub_pool_size).

To ensure Cuniculus tries to finish publishing jobs on shutdown, it's important that Cuniculus.shutdown is called. Once this method is called, workers have a grace period to publish enqueued jobs, after which the shutdown is forced. The period is set in seconds in config.pub_shutdown_grace_period (defaults to 50).

Example code for the Puma web server:

on_worker_shutdown do
  Cuniculus.shutdown
end

Retry mechanism

Retries are enabled by default (with 8 retries) with an exponential backoff, meaning the time between retries increases the more failures happen. The formula for calculating the times between retries can be found in {Cuniculus::QueueConfig}, namely in the x-message-ttl line. As an example, the time between the 7th and 8th retries is roughly 29 days.

Given a queue in the configuration, Cuniculus declares on RabbitMQ the corresponding base queue, in addition to its retry queues. As an example, let's consider the default queue cun_default: Cuniculus declares a cun_default queue, together with some cun_default_{n} queues used for job retries.

When a job raises an exception, it is placed into the cun_default_1 queue for the first retry. It stays there for some pre-defined time, and then gets moved back into the cun_default queue for execution.

If it fails again, it gets moved to cun_default_2, where it stays for a longer period until it's moved back directly into the cun_default queue again.

This goes on until there are no more retry attempts, in which case the job gets moved into the cun_dead queue. It can be then only be moved back into the cun_default queue manually (from RabbitMQ itself, not with Cuniculus); otherwise it is discarded after some time, defined as the {Cuniculus::Config.dead_queue_ttl}, in milliseconds (by default, 180 days).

Note that if a job cannot even be parsed, it is moved straight to the dead queue, as there's no point in retrying.

Health check plugin

Cuniculus ships with a health check plugin. When enabled, a Rack server is started (therefore the Rack gem is required, as well as the used handler), which responds with 200 OK upon receiving a request in the configured port and path.

Enable it with Cuniculus.plugin(:health_check), which binds the server to 0.0.0.0:3000, listening on the /healthcheck path. To configure the server, pass additional options:

Cuniculus.plugin(:health_check, { "bind_to" => "127.0.0.1", "port" => 3003, "path" => "ping" })

Check {Cuniculus::Plugins::HealthCheck} for further details.

Note that the default handler "webrick" is not bundled by default with Ruby 3 and needs to be installed separately, if it is to be used.

How it works

Cuniculus code and conventions are very much inspired by another Ruby job queue library: Sidekiq.

To communicate with RabbitMQ, Cuniculus uses Bunny.

The first time an async job is produced, a thread pool is created, each thread with its own communication channel to RabbitMQ. These threads push jobs to RabbitMQ.

For consuming, each queue will have a corresponding thread pool (handled by Bunny) for concurrency.

License

Cuniculus is licensed under the "BSD 2-Clause License". See LICENSE for details.

About

Job queue processing backed by RabbitMQ

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages