A forking server for hosting rabbitMQ consumer processes as load balanced rack applications.
$ rack-rabbit --queue myqueue --workers 4 app/config.ru
- Summary
- Installation
- Getting started by example
- More examples
- Server usage
- Server configuration
- Signals
- Forking worker processes
- RabbitMQ acknowledgements
- Client binary
- Client library
- Supported platforms
- TODO
- License
- Credits
- Contact
What Unicorn does for HTTP services, RackRabbit can do for hosting AMQP services, and more:
HTTP | AMQP | |
---|---|---|
Make a synchronous request/response | Unicorn | rabbitMQ + RackRabbit |
Asynchronous worker queue | Redis + Resque | rabbitMQ + RackRabbit |
Asynchronous publish/subscribe | Redis | rabbitMQ + RackRabbit |
RackRabbit hosts a cluster of worker processes that:
- Subscribe to a queue/exchange
- Convert incoming messages into a suitable Rack environment
- Call your Rack app to handle the message
- Publish a reply back to the original caller (if
reply_to
was provided)
RackRabbit supports a variety of messaging patterns:
- Synchronous Request/Response (e.g. GET/POST/PUT/DELETE)
- Asynchronous Worker queue (e.g. ENQUEUE)
- Asynchronous PubSub (e.g. PUBLISH)
Install a rabbitMQ server if necessary (docs):
$ sudo apt-get install rabbitmq-server
Update your Gemfile to include RackRabbit and your preferred rabbitMQ client library
gem bunny, "~> 1.4" # or an alternative such as AMQP or march-hare
gem rack-rabbit, "~> 0.1"
You can use RackRabbit to host an AMQP service as a rack application in the same way that you might use Unicorn to host an HTTP service.
Imagine a simple rack application in config.ru
:
class Service
def self.call(env)
request = Rack::Request.new(env)
method = request.request_method
path = request.path_info
body = request.body.read
message = "#{method} #{path} #{body}"
[ 200, {}, [ message ]]
end
end
run Service
You can host and load balance this service using rack-rabbit
:
$ rack-rabbit --queue myqueue --workers 4 config.ru
Ensure the worker processes are running:
$ ps xawf | grep rack-rabbit
15714 pts/4 Sl+ 0:00 | \_ ruby rack-rabbit --queue myqueue --workers 4 config.ru
15716 pts/4 Sl+ 0:00 | \_ rack-rabbit -- waiting for request
15718 pts/4 Sl+ 0:00 | \_ rack-rabbit -- waiting for request
15721 pts/4 Sl+ 0:00 | \_ rack-rabbit -- waiting for request
15723 pts/4 Sl+ 0:00 | \_ rack-rabbit -- waiting for request
Connect to the worker from the command line using the rr
command:
$ rr -q myqueue /hello # synchronous GET request/response
GET /hello
$ rr -q myqueue POST /submit "data" # synchronous POST request/response
POST /submit data
$ rr -q myqueue PUT /update "data" # synchronous PUT request/response
PUT /update data
$ rr -q myqueue DELETE /resource # synchronous DELETE request/response
DELETE /resource
Connect to the worker from your applications using the RR
class.
require 'rack-rabbit/client'
RR.get :myqueue, "/hello" # returns "GET /hello"
RR.post :myqueue, "/submit", "data" # returns "POST /submit data"
RR.put :myqueue, "/update", "data" # returns "PUT /update data"
RR.delete :myqueue, "/resource", # returns "DELETE /resource"
See EXAMPLES.md for more detailed examples, including ENQUEUE and PUBLISH communication patterns, and using Sinatra.
Use the rack-rabbit
executable to host your Rack app in a forking server that
subscribes to either a named queue or an exchange.
$ rack-rabbit --help
A load balanced rack server for hosting rabbitMQ consumer processes.
Usage: rack-rabbit [options] rack-file
Examples:
rack-rabbit -h broker -q my.queue # subscribe to a named queue
rack-rabbit -h broker -e my.exchange -t fanout # subscribe to a fanout exchange
rack-rabbit -h broker -e my.exchange -t topic -r my.topic # subscribe to a topic exchange with a routing key
rack-rabbit -c rack-rabbit.conf # subscribe with advanced options provided by a config file
RackRabbit options:
-c, --config CONFIG provide options using a rack-rabbit configuration file
-q, --queue QUEUE subscribe to a queue for incoming requests
-e, --exchange EXCHANGE subscribe to an exchange for incoming requests
-t, --type TYPE subscribe to an exchange for incoming requests - type (e.g. :direct, :fanout, :topic)
-r, --route ROUTE subscribe to an exchange for incoming requests - routing key
-a, --app_id ID an app_id for this application server
--host HOST the rabbitMQ broker IP address (default: 127.0.0.1)
--port PORT the rabbitMQ broker port (default: 5672)
Process options:
-w, --workers COUNT the number of worker processes (default: 1)
-d, --daemonize run daemonized in the background (default: false)
-p, --pid PIDFILE the pid filename (default when daemonized: /var/run/<app_id>.pid)
-l, --log LOGFILE the log filename (default when daemonized: /var/log/<app_id>.log)
--log-level LEVEL the log level for rack rabbit output (default: info)
--preload preload the rack app before forking worker processes (default: false)
Ruby options:
-I, --include PATH an additional $LOAD_PATH (may be used more than once)
--debug set $DEBUG to true
--warn enable warnings
Common options:
-h, --help
-v, --version
Detailed configuration can be provided by an external config file using the --config
option
# set the Rack application to be used to handle messages (default 'config.ru'):
rack_file 'app/config.ru'
# set the rabbitMQ connection:
rabbit :host => '10.0.0.42', # default '127.0.0.1'
:port => '1234' # default '5672'
:adapter => :amqp # default :bunny
# subscribe to a queue:
queue 'my.queue'
# ... or, subscribe to an exchange:
exchange 'my.exchange'
exchange_type :topic
routing_key 'my.topic'
# set the app_id used to identify your application in response messages
app_id 'my-application'
# enable rabbitMQ acknowledgements (default: false):
ack true
# set the initial number of worker processes (default: 1):
workers 8
# set the minimum number of worker processes (default: 1):
min_workers 1
# set the maximum number of worker processes (default: 100):
max_workers 16
# preload the Rack app in the server for faster worker forking (default: false):
preload_app true
# daemonize the process (default: false)
daemonize true
# set the path to the logfile
logfile "/var/log/my-application.log"
# set the path to the pidfile
pidfile "/var/run/my-application.pid"
# set the log level for the Rack Rabbit logger (default: info)
log_level 'debug'
# set the Logger to used by the Rack Rabbit server and the worker Rack applications (default: Logger)
logger MyLogger.new
Signals should be sent to the master process
- HUP - reload the RackRabbit config file and gracefully restart all workers
- QUIT - graceful shutdown, waits for workers to complete handling of their current message before finishing
- TERM - quick shutdown kills all workers immediately
- INT - quick shutdown kills all workers immediately
- TTIN - increase the number of worker processes by one
- TTOU - decrease the number of worker processes by one
If you are using the preload_app
directive, your app will be loaded into the master
server process before any workers have forked. Therefore, you may need to re-initialize
resources after each worker process forks, e.g if using ActiveRecord:
before_fork do |server|
# no need for connection in the server process
defined?(ActiveRecord::Base) and ActiveRecord::Base.connection.disconnect!
end
after_fork do |server, worker|
# reestablish connection in each worker process
defined?(ActiveRecord::Base) and ActiveRecord::Base.establish_connection
end
This should NOT be needed when the preload_app
directive is false.
this is an issue with any preforking style server (e.g. Unicorn)
By default, an AMQP broker removes a message from the queue immediately after sending it to
the consumer. If the consumer dies before processing the message completely then the message
is lost. Users who need more control can configure the broker to use explicit
acknowledgements (learn more)
by setting the RackRabbit ack
configuration option to true
.
With explicit acknowledgements enabled...
-
If your rack handler succeeds (returns a 2xx status code) then RackRabbit will automatically send an acknowledgement to rabbitMQ.
-
If your rack handler fails (throws an exception or returns a non-2xx status code) then RackRabbit will automatically send a rejection to rabbitMQ - you might want to setup a dead-letter queue for these rejections.
-
If your rack handler process crashes then rabbitMQ will hand the message off to the next available worker process.
If your service action is idempotent then nothing more is needed.
However, if you need more fine-grained controls, then RackRabbit exposes the underlying message to your
application in the rack environment as env['rabbit.message']
. You can use this object to explicitly
acknowledge or reject the message at any time during your rack handler, e.g:
post "/work" do
message = request.env['rabbit.message']
# ... do some preliminary work (idempotent)
if everything_looks_good
message.ack # take responsibility
... # and do some more work (that might not be idempotent)
else
message.reject # reject the message
... # and (maybe) do some more work
end
end
Communicating with a RackRabbit hosted service from the command line can be done using the rr
binary:
Make a request to a RackRabbit service.
Usage: rr <command> [options] [METHOD] [PATH] [BODY]
list of commands:
request make a synchronous request to a rabbitMQ queue and wait for a reply
enqueue make an asynchronous request to a rabbitMQ queue and continue
publish make an asynchronous request to a rabbitMQ exchange with a routing key
help show help for a given topic or a help overview
version show version
Examples:
rr request -q queue GET /hello # submit GET to queue and WAIT for reply
rr request -q queue POST /submit 'data' # submit POST to queue and WAIT for reply
rr enqueue -q queue POST /submit 'data' # submit POST to queue and CONTINUE
rr enqueue -q queue DELETE /resource # submit DELETE to queue and CONTINUE
rr publish -e ex -t fanout POST /event # submit POST to a fanout exchange and CONTINUE
rr publish -e ex -t topic -r foo POST /submit 'data' # submit POST to a topic exchange with routing key and CONTINUE
RackRabbit options:
--host HOST the rabbitMQ broker IP address (default: 127.0.0.1)
--port PORT the rabbitMQ broker port (default: 5672)
-q, --queue QUEUE a queue for publishing outgoing requests
-e, --exchange EXCHANGE publish to a non-default exchange - name
-t, --type TYPE publish to a non-default exchange - type (e.g. :direct, :fanout, :topic)
-r, --route ROUTE a routing key when publishing to a non-default exchange
Ruby options:
-I, --include PATH specify an additional $LOAD_PATH (may be used more than once)
--debug set $DEBUG to true
--warn enable warnings
Common options:
-h, --help
-v, --version
Communicating with a RackRabbit hosted service from your application can be done using the RR
class:
RR.get( "myqueue", "/path/to/resource")
RR.post( "myqueue", "/path/to/resource", "content")
RR.put( "myqueue", "/path/to/resource", "content")
RR.delete( "myqueue", "/path/to/resource")
RR.enqueue("myqueue", "/path/to/resource", "content")
RR.publish("myexchange", "/path/to/resource", "content")
These methods are wrappers around a more detailed RackRabbit::Client
class:
client = RackRabbit::Client.new(:host => "127.0.0.1", :port => 5672, :adapter => :bunny)
client.get( "myqueue", "/path/to/resource")
client.post( "myqueue", "/path/to/resource", "content")
client.put( "myqueue", "/path/to/resource", "content")
client.delete("myqueue", "/path/to/resource")
client.enqueue("myqueue", "/path/to/resource", "content")
client.publish("myexchange", "/path/to/resource", "content")
client.disconnect
More advanced options can be passed as an (optional) last parameter, e.g:
client.post("myqueue", "/path", content.to_json, {
:headers => { "additional" => "header" }, # made available in the service's rack env
:priority => 5, # specify the rabbitMQ message priority
:content_type => "application/json", # specify the request content_type
:content_encoding => "utf-8" # specify the request content_enoding
})
client.publish("myexchange", "/path", "content", {
:exchange_type => :topic, # specify the rabbitMQ exchange type
:routing_key => "my.custom.topic", # specify a custom rabbitMQ routing key
})
- MRI 2.1.2
- MRI 1.9.3
- FEATURE - allow multiple Client#reqeust in parallel (block until all have replied) - a-la-typheous
- FEATURE - share a single reply queue across all Client#request
- FEATURE - automatically deserialize body for known content type (e.g. json)
- FEATURE - have exception stack trace sent back to client in development/test mode
- FEATURE - support JRuby
- FEATURE - support Rubinius
- BUG - avoid infinte worker spawn loop if worker fails during startup (e.g. connection to rabbit fails)
- TEST - integration tests for worker, server, adapter/bunny, and adapter/amqp
See LICENSE file.
Thanks to Jesse Storimer for his book Working with Unix Processes
Thanks to the Unicorn Team for providing a great example of a preforking server.
Thanks to the Bunny Team for providing an easy rabbitMQ Ruby client.
If you have any ideas, feedback, requests or bug reports, you can reach me at [email protected], or via my website: Code inComplete.