Skip to content

Commit

Permalink
Merge pull request #60 from paddor/master
Browse files Browse the repository at this point in the history
This ports Celluloid::ZMQ to CZTop
  • Loading branch information
chuckremes authored Jun 21, 2016
2 parents a2df418 + f9ecd4d commit 6e4b9ed
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 180 deletions.
25 changes: 15 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
before_install:
- sudo apt-get install libzmq3-dev
- if [ "$TRAVIS_RUBY_VERSION" == "rbx-2" ] ; then export BUNDLE_JOBS=1 ; else export BUNDLE_JOBS=4; fi
sudo: required
script: rake ci
- PATH="/usr/lib/ccache:$PATH" # enable ccache
- export LD_LIBRARY_PATH=$HOME/lib # custom libs (for execution)
- export PKG_CONFIG_PATH=$HOME/lib/pkgconfig # custom libs (for linking)
- export BUNDLE_PATH=$HOME/.bundle # bundle caching
- ( mkdir -p vendor && cd vendor && git clone --depth 1 https://github.com/paddor/czmq-ffi-gen && cd czmq-ffi-gen && ci-scripts/install-libzmq && ci-scripts/install-libczmq; )
sudo: false
cache:
directories:
- $HOME/.ccache
- $HOME/.bundle
script: bundle exec rake ci
language: ruby
install: bundle install --without=development
rvm:
- rbx
- jruby-1.7.25
- 2.2.2
- rbx-2
- 2.2.3
- jruby-9.1.2.0
- 2.1.8
- 2.3.0
- 2.3.1
- ruby-head
- jruby-head
matrix:
fast_finish: true
allow_failures:
- rvm: rbx-2
- rvm: jruby-9.1.2.0
- rvm: 2.1.8
- rvm: 2.3.0
- rvm: ruby-head
- rvm: jruby-head
- env: CELLULOID_BACKPORTED=true
Expand All @@ -31,6 +34,8 @@ matrix:
env:
global:
- NUMBER_OF_PROCESSORS=4 CELLULOID_CONFIG_FILE=.env-ci
# recognized by czmq-ffi-gen's ci-scripts
- CZMQ_VERSION=HEAD ZMQ_VERSION=HEAD
matrix:
- CELLULOID_BACKPORTED=true
- CELLULOID_BACKPORTED=false
Expand Down
18 changes: 6 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
[![Coverage Status](https://coveralls.io/repos/celluloid/celluloid-zmq/badge.png?branch=master)](https://coveralls.io/r/celluloid/celluloid-zmq)

`Celluloid::ZMQ` provides Celluloid actors that can interact with [0MQ sockets][0mq].
Underneath, it's built on the [ffi-rzmq][ffi-rzmq] library. `Celluloid::ZMQ` was
Underneath, it's built on the [CZTop][cztop] library. `Celluloid::ZMQ` was
primarily created for the purpose of writing [DCell][dcell], distributed Celluloid
over 0MQ, so before you go building your own distributed Celluloid systems with
`Celluloid::ZMQ`, be sure to give DCell a look and decide if it fits your purposes.

[0mq]: http://www.zeromq.org/
[ffi-rzmq]: https://github.com/chuckremes/ffi-rzmq
[cztop]: https://github.com/paddor/cztop
[dcell]: https://github.com/celluloid/dcell

It provides different `Celluloid::ZMQ::Socket` classes which can be initialized
Expand All @@ -21,13 +21,10 @@ then sent `bind` or `connect`. Once bound or connected, the socket can

## Supported Platforms

Celluloid::IO requires Ruby 1.9 support on all Ruby VMs. You will also need
the ZeroMQ library installed as it's accessed via FFI.
You will need the ZeroMQ library and the CZMQ library installed as it's
accessed via FFI. See [CZTop][cztop] for installation instructions.

Supported VMs are Ruby 1.9.3, JRuby 1.6, and Rubinius 2.0.

To use JRuby in 1.9 mode, you'll need to pass the "--1.9" command line option
to the JRuby executable, or set the "JRUBY_OPTS=--1.9" environment variable.
Supported Rubies are MRI >= 2.2, JRuby >= 9.0.4.0, and Rubinius >= 3.7.

## 0MQ Socket Types

Expand All @@ -45,8 +42,6 @@ The following 0MQ socket types are supported (see [types.rb][types] for more inf
```ruby
require 'celluloid/zmq'

Celluloid::ZMQ.init

class Server
include Celluloid::ZMQ

Expand Down Expand Up @@ -85,8 +80,7 @@ class Client
end

def write(message)
@socket.send(message)

@socket << message
nil
end
end
Expand Down
5 changes: 2 additions & 3 deletions celluloid-zmq.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require File.expand_path("../culture/sync", __FILE__)
Gem::Specification.new do |gem|
gem.authors = ["Tony Arcieri"]
gem.email = ["[email protected]"]
gem.description = "Celluloid bindings to the ffi-rzmq library"
gem.description = "Celluloid bindings to the CZMQ library"
gem.summary = "Celluloid::ZMQ provides concurrent Celluloid actors that can listen for 0MQ events"
gem.homepage = "http://github.com/celluloid/celluloid-zmq"
gem.license = "MIT"
Expand All @@ -13,8 +13,7 @@ Gem::Specification.new do |gem|
gem.version = Celluloid::ZMQ::VERSION

Celluloid::Sync::Gemspec[gem]
gem.add_dependency "ffi"
gem.add_dependency "ffi-rzmq"
gem.add_dependency "cztop"

# Files
ignores = File.read(".gitignore").split(/\r?\n/).reject { |f| f =~ /^(#.+|\s*)$/ }.map { |f| Dir[f] }.flatten
Expand Down
2 changes: 1 addition & 1 deletion culture
Submodule culture updated 2 files
+4 −4 SYNC.md
+2 −2 gems/README.md
25 changes: 15 additions & 10 deletions lib/celluloid/zmq.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require "ffi-rzmq"
require 'cztop'

$CELLULOID_ZMQ_BACKPORTED = (ENV["CELLULOID_ZMQ_BACKPORTED"] != "false") unless defined?($CELLULOID_ZMQ_BACKPORTED)

Expand Down Expand Up @@ -28,19 +28,22 @@ def included(klass)
klass.mailbox_class Celluloid::ZMQ::Mailbox
end

# Obtain a 0MQ context
def init(worker_threads = 1)
@context ||= ::ZMQ::Context.new(worker_threads)
# @deprecated
def init(*)
Celluloid::Internals::Logger.deprecate("Calling .init isn't needed anymore")
nil
end

# @deprecated
def context
fail UninitializedError, "you must initialize Celluloid::ZMQ by calling Celluloid::ZMQ.init" unless @context
@context
Celluloid::Internals::Logger.deprecate("Accessing ZMQ's context is deprecated")
nil
end

# @deprecated
def terminate
@context.terminate if @context
@context = nil
Celluloid::Internals::Logger.deprecate("Calling .terminate isn't needed anymore")
nil
end
end

Expand Down Expand Up @@ -72,8 +75,10 @@ def wait_writable(socket)
end
module_function :wait_writable

def result_ok?(result)
::ZMQ::Util.resultcode_ok?(result)
# @deprecated
def result_ok?(_result)
Celluloid::Internals::Logger.deprecate("Checking results of ZMQ operations isn't needed anymore")
true
end
module_function :result_ok?
end
Expand Down
35 changes: 20 additions & 15 deletions lib/celluloid/zmq/reactor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,24 @@ class Reactor
extend Forwardable
def_delegator :@waker, :signal, :wakeup
def_delegator :@waker, :cleanup, :shutdown
def_delegator ZMQ, :result_ok?

def initialize
@waker = Waker.new
@poller = ::ZMQ::Poller.new
@poller = ::CZTop::Poller::Aggregated.new
@readers = {}
@writers = {}

rc = @poller.register @waker.socket, ::ZMQ::POLLIN
fail "0MQ poll error: #{::ZMQ::Util.error_string}" unless result_ok? rc
@poller.add_reader(@waker.socket)
end

# Wait for the given ZMQ socket to become readable
def wait_readable(socket)
monitor_zmq socket, @readers, ::ZMQ::POLLIN
monitor_zmq socket, @readers, :read
end

# Wait for the given ZMQ socket to become writable
def wait_writable(socket)
monitor_zmq socket, @writers, ::ZMQ::POLLOUT
monitor_zmq socket, @writers, :write
end

# Monitor the given ZMQ socket with the given options
Expand All @@ -36,7 +34,14 @@ def monitor_zmq(socket, set, type)
set[socket] = Task.current
end

@poller.register socket, type
case type
when :read
@poller.add_reader(socket)
when :write
@poller.add_writer(socket)
else
raise ArgumentError, "wrong type: #{type.inspect}"
end

Task.suspend :zmqwait
socket
Expand All @@ -48,21 +53,21 @@ def run_once(timeout = nil)
if timeout
timeout *= 1000 # Poller uses millisecond increments
else
timeout = :blocking
timeout = 0 # blocking
end

rc = @poller.poll(timeout)

unless result_ok? rc
fail IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
begin
@poller.wait(timeout)
rescue
raise IOError, "ZMQ poll error: #{$!.message}"
end

@poller.readables.each do |sock|
if sock == @waker.socket
@waker.wait
else
task = @readers.delete sock
@poller.deregister sock, ::ZMQ::POLLIN
@poller.remove_reader(sock)

if task
task.resume
Expand All @@ -74,12 +79,12 @@ def run_once(timeout = nil)

@poller.writables.each do |sock|
task = @writers.delete sock
@poller.deregister sock, ::ZMQ::POLLOUT
@poller.remove_writer(sock)

if task
task.resume
else
Celluloid::Logger.debug "ZMQ error: got write event without associated reader"
Celluloid::Logger.debug "ZMQ error: got write event without associated writer"
end
end
end
Expand Down
73 changes: 40 additions & 33 deletions lib/celluloid/zmq/socket.rb
Original file line number Diff line number Diff line change
@@ -1,75 +1,82 @@
module Celluloid
module ZMQ
class Socket
extend Forwardable
def_delegator ZMQ, :result_ok?

# Create a new socket
def initialize(type)
@socket = Celluloid::ZMQ.context.socket ::ZMQ.const_get(type.to_s.upcase)
type = type.is_a?(Integer) ? type : type.to_s.upcase.to_sym
@socket = CZTop::Socket.new_by_type(type)
@linger = 0
end
attr_reader :linger

# Connect to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def connect(addr)
unless result_ok? @socket.connect addr
fail IOError, "error connecting to #{addr}: #{::ZMQ::Util.error_string}"
end
@socket.connect addr
true
rescue
raise IOError, "error connecting to #{addr}: #{$!.message}"
end

def linger=(value)
@linger = value || -1

unless result_ok? @socket.setsockopt(::ZMQ::LINGER, value)
fail IOError, "couldn't set linger: #{::ZMQ::Util.error_string}"
end
value ||= -1
@socket.options.linger = value
@linger = value
rescue
raise IOError, "couldn't set linger: #{$!.message}"
end

def identity=(value)
unless result_ok? @socket.setsockopt(::ZMQ::IDENTITY, "#{value}")
fail IOError, "couldn't set identity: #{::ZMQ::Util.error_string}"
end
# de @socket.identity = value
@socket.options.identity = "#{value}"
rescue
raise IOError, "couldn't set identity: #{$!.message}"
end

def identity
# de @socket.identity
get(::ZMQ::IDENTITY)
@socket.options.identity
end

def set(option, value, length = nil)
unless result_ok? @socket.setsockopt(option, value, length)
fail IOError, "couldn't set value for option #{option}: #{::ZMQ::Util.error_string}"
end
def set(option, value, _length = nil)
@socket.options[option] = value
rescue
raise IOError, "couldn't set value for option #{option}: #{$!.message}"
end

def get(option)
option_value = []

unless result_ok? @socket.getsockopt(option, option_value)
fail IOError, "couldn't get value for option #{option}: #{::ZMQ::Util.error_string}"
end

option_value[0]
@socket.options[option]
rescue
raise IOError, "couldn't get value for option #{option}: #{$!.message}"
end

# Bind to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def bind(addr)
unless result_ok? @socket.bind(addr)
fail IOError, "couldn't bind to #{addr}: #{::ZMQ::Util.error_string}"
end
@socket.bind(addr)
rescue
raise IOError, "couldn't bind to #{addr}: #{$!.message}"
end

# Close the socket
def close
@socket.close
end
end
end
end

# Hide ffi-rzmq internals
alias_method :inspect, :to_s
unless defined?(::ZMQ)
# Make legacy code like this work:
#
# zmq_socket.set(::ZMQ::IDENTITY, "foo")
# zmq_socket.get(::ZMQ::IDENTITY)
#
# This assumes that the user didn't require 'ffi-rzmq' themselves, but had
# it done by celluloid-zmq.
module ZMQ
def self.const_missing(name)
Celluloid::Internals::Logger.deprecate("Using ZMQ::#{name} as an option name is deprecated. Please report if you need this, so it can be added to Celluloid::ZMQ::Socket.")
return name
end
end
end
Loading

0 comments on commit 6e4b9ed

Please sign in to comment.