Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade jnats.rb wrapper to use jnats.jar 2.6.0 #42

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
sudo: false
language: ruby
jdk: openjdk8
rvm:
- 2.3.0
- jruby-9.1.7.0
before_install:
- gem install bundler -v 1.14.3
- gem install bundler
- gem update --system
- wget https://github.com/nats-io/gnatsd/releases/download/v1.3.0/gnatsd-v1.3.0-linux-amd64.zip
- unzip gnatsd-v1.3.0-linux-amd64.zip
- ./gnatsd-v1.3.0-linux-amd64/gnatsd &
Binary file removed ext/jars/jnats-1.1-SNAPSHOT.jar
Binary file not shown.
Binary file added ext/jars/jnats-2.6.0.jar
Binary file not shown.
6 changes: 3 additions & 3 deletions lib/protobuf/nats/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ def send_request_through_nats
end

parse_response
rescue ::Protobuf::Nats::Errors::IOException => error
rescue ::Protobuf::Nats::Errors::IOException, ::Protobuf::Nats::Errors::IllegalStateException => error
::Protobuf::Nats.log_error(error)

delay = reconnect_delay
logger.warn "An IOException was raised. We are going to sleep for #{delay} seconds."
logger.warn "An #{error.class} was raised. We are going to sleep for #{delay} seconds."
sleep delay

retry if (retries -= 1) > 0
Expand Down Expand Up @@ -207,7 +207,7 @@ def nats_request_with_two_responses(subject, data, opts)
begin
completed_request = false

if !sub_inbox.subscription.is_valid # replace the subscription if is has been pooled but is no longer valid (maybe a reconnect)
if !sub_inbox.subscription.is_active # replace the subscription if is has been pooled but is no longer valid (maybe a reconnect)
nats.unsubscribe(sub_inbox.subscription)
sub_inbox.swap(new_subscription_inbox) # this line replaces the sub_inbox in the connection pool if necessary
end
Expand Down
9 changes: 9 additions & 0 deletions lib/protobuf/nats/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ class ResponseTimeout < Base
class MriIOException < ::StandardError
end

class MriIllegalStateException < ::StandardError
end

IllegalStateException = if defined? JRUBY_VERSION
java.lang.IllegalStateException
else
MriIllegalStateException
end

IOException = if defined? JRUBY_VERSION
java.io.IOException
else
Expand Down
251 changes: 138 additions & 113 deletions lib/protobuf/nats/jnats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,82 @@
require ::File.join(ext_base, "jars/slf4j-api-1.7.25.jar")
require ::File.join(ext_base, "jars/slf4j-simple-1.7.25.jar")
require ::File.join(ext_base, "jars/gson-2.6.2.jar")
require ::File.join(ext_base, "jars/jnats-1.1-SNAPSHOT.jar")

# Set field accessors so we can access the member variables directly.
class Java::IoNatsClient::SubscriptionImpl
field_accessor :pMsgs, :pBytes, :delivered
end
require ::File.join(ext_base, "jars/jnats-2.6.0.jar")

module Protobuf
module Nats
class JNats
attr_reader :connection, :options
attr_reader :connection, :dispatcher, :options

class MessageHandlerProxy
include ::Java::IoNatsClient::MessageHandler

def self.empty
new {}
end

def initialize(&block)
@cb = block
end

def onMessage(message)
@cb.call(message.getData.to_s, message.getReplyTo, message.getSubject)
end
end

class ConnectionListener
include ::Java::IoNatsClient::ConnectionListener

def initialize
@on_reconnect_cb = lambda {}
@on_disconnect_cb = lambda {}
@on_close_cb = lambda {}
end

def on_close(&block); @on_close_cb = block; end
def on_disconnect(&block); @on_disconnect_cb = block; end
def on_reconnect(&block); @on_reconnect_cb = block; end

def connectionEvent(conn, event_type)
case event_type
when ::Java::IoNatsClient::ConnectionListener::Events::RECONNECTED
@on_reconnect_cb.call
when ::Java::IoNatsClient::ConnectionListener::Events::DISCONNECTED
@on_disconnect_cb.call
when ::Java::IoNatsClient::ConnectionListener::Events::CLOSED
@on_close_cb.call
end
end
end

class ErrorListener
include ::Java::IoNatsClient::ErrorListener

def initialize
@on_error_cb = lambda { |_error| }
@on_exception_cb = lambda { |_exception| }
@on_slow_consumer_cb = lambda { |_consumer| }
end

def on_error(&block)
return if block.nil? || block.arity != 1
@on_error_cb = block
end

def on_exception(&block)
return if block.nil? || block.arity != 1
@on_exception_cb = block
end

def on_slow_consumer(&block)
return if block.nil? || block.arity != 1
@on_slow_consumer_cb = block
end

def errorOccurred(_conn, error); @on_error_cb.call(error); end
def exceptionOccurred(_conn, exception); @on_exception_cb.call(exception); end
def slowConsumerDetected(_conn, consumer); @on_slow_consumer_cb.call(consumer); end
end

class Message
attr_reader :data, :subject, :reply
Expand All @@ -26,10 +91,9 @@ def initialize(nats_message)
end

def initialize
@on_error_cb = lambda {|error|}
@on_reconnect_cb = lambda {}
@on_disconnect_cb = lambda {}
@on_close_cb = lambda {}
@connection_listener = ConnectionListener.new
@error_listener = ErrorListener.new

@options = nil
@subz_cbs = {}
@subz_mutex = ::Mutex.new
Expand All @@ -39,35 +103,26 @@ def connect(options = {})
@options ||= options

servers = options[:servers] || ["nats://localhost:4222"]
servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) }
connection_factory = ::Java::IoNatsClient::ConnectionFactory.new
connection_factory.setServers(servers)
connection_factory.setMaxReconnect(options[:max_reconnect_attempts])
servers = [servers].flatten

builder = ::Java::IoNatsClient::Options::Builder.new
builder.servers(servers)
builder.maxReconnects(options[:max_reconnect_attempts])
builder.errorListener(@error_listener)

# Shrink the pending buffer to always raise an error and let the caller retry.
if options[:disable_reconnect_buffer]
connection_factory.setReconnectBufSize(1)
builder.reconnectBufferSize(1)
end

# Setup callbacks
connection_factory.setDisconnectedCallback { |event| @on_disconnect_cb.call }
connection_factory.setReconnectedCallback { |_event| @on_reconnect_cb.call }
connection_factory.setClosedCallback { |_event| @on_close_cb.call }
connection_factory.setExceptionHandler { |error| @on_error_cb.call(error) }

# Setup ssl context if we're using tls
if options[:uses_tls]
ssl_context = create_ssl_context(options)
connection_factory.setSecure(true)
connection_factory.setSSLContext(ssl_context)
builder.sslContext(ssl_context)
end

@connection = connection_factory.createConnection

# We're going to spawn a consumer and supervisor
@work_queue = @connection.createMsgChannel
spwan_supervisor_and_consumer

@connection = ::Java::IoNatsClient::Nats.connect(builder.build)
@dispatcher = @connection.createDispatcher(MessageHandlerProxy.empty)
@connection
end

Expand All @@ -80,134 +135,80 @@ def connection

# Do not depend on #close for a graceful disconnect.
def close
@connection.close rescue nil
if @connection
@connection.closeDispatcher(@dispatcher) rescue nil
@connection.close rescue nil
end
@dispatcher = nil
@connection = nil
@supervisor.kill rescue nil
@supervisor = nil
@consumer.kill rescue nil
@supervisor = nil
end

def flush(timeout_sec = 0.5)
connection.flush(timeout_sec * 1000)
duration = duration_in_ms(timeout_sec * 1000)
connection.flush(duration)
end

def next_message(sub, timeout_sec)
nats_message = sub.nextMessage(timeout_sec * 1000)
duration = duration_in_ms(timeout_sec * 1000)
nats_message = sub.nextMessage(duration)
return nil unless nats_message
Message.new(nats_message)
end

def publish(subject, data, mailbox = nil)
# The "true" here is to force flush. May not need this.
connection.publish(subject, mailbox, data.to_java_bytes, true)
connection.publish(subject, mailbox, data.to_java_bytes)
connection.flush(nil)
end

def subscribe(subject, options = {}, &block)
queue = options[:queue]
max = options[:max]
work_queue = nil
# We pass our work queue for processing async work because java nats
# uses a cahced thread pool: 1 thread per async subscription.
# Sync subs need their own queue so work is not processed async.
work_queue = block.nil? ? connection.createMsgChannel : @work_queue
sub = connection.subscribe(subject, queue, nil, work_queue)

# Register the block callback. We only lock to save the callback.

if block
@subz_mutex.synchronize do
@subz_cbs[sub.getSid] = block
end
handler = MessageHandlerProxy.new(&block)
sub = subscribe_using_subscription_dispatcher(subject, queue, handler)
# Auto unsub if max message option was provided.
dispatcher.unsubscribe(sub, max) if max
sub
else
sub = subscribe_using_connection(subject, queue)
sub.unsubscribe(max) if max
sub
end

# Auto unsub if max message option was provided.
sub.autoUnsubscribe(max) if max

sub
end

def unsubscribe(sub)
return if sub.nil?

# Cleanup our async callback
if @subz_cbs[sub.getSid]
@subz_mutex.synchronize do
@subz_cbs.delete(sub.getSid)
end
if sub.getDispatcher
dispatcher.unsubscribe(sub)
else
sub.unsubscribe()
end

# The "true" here is to ignore and invalid conn.
sub.unsubscribe(true)
end

def new_inbox
"_INBOX.#{::SecureRandom.hex(13)}"
end

def on_reconnect(&cb)
@on_reconnect_cb = cb
@connection_listener.on_reconnect(&cb)
end

def on_disconnect(&cb)
@on_disconnect_cb = cb
@connection_listener.on_disconnect(&cb)
end

def on_error(&cb)
@on_error_cb = cb
@error_listener.on_exception(&cb)
end

def on_close(&cb)
@on_close_cb = cb
@connection_listener.on_close(&cb)
end

private

def spwan_supervisor_and_consumer
spawn_consumer
@supervisor = ::Thread.new do
loop do
begin
sleep 1
next if @consumer && @consumer.alive?
# We need to recreate the consumer thread
@consumer.kill if @consumer
spawn_consumer
rescue => error
@on_error_cb.call(error)
end
end
end
end

def spawn_consumer
@consumer = ::Thread.new do
loop do
begin
message = @work_queue.take
next unless message
sub = message.getSubscription

# We have to update the subscription stats so we're not considered a slow consumer.
begin
sub.lock
sub.pMsgs -= 1
sub.pBytes -= message.getData.length if message.getData
sub.delivered += 1 unless sub.isClosed
ensure
sub.unlock
end

# We don't need t
callback = @subz_cbs[sub.getSid]
next unless callback
callback.call(message.getData.to_s, message.getReplyTo, message.getSubject)
rescue => error
@on_error_cb.call(error)
end
end
end
end

# Jruby-openssl depends on bouncycastle so our lives don't suck super bad
def read_pem_object_from_file(path)
fail ::ArgumentError, "Tried to read a PEM key or cert with path nil" if path.nil?
Expand Down Expand Up @@ -251,6 +252,30 @@ def create_ssl_context(options)
context.init(key_manager.getKeyManagers, trust_manager.getTrustManagers, nil)
context
end

def duration_in_ms(ms)
::Java::JavaTime::Duration.ofMillis(ms)
end

def subscribe_using_connection(subject, queue)
if queue
connection.subscribe(subject, queue)
else
connection.subscribe(subject)
end
end

def subscribe_using_subscription_dispatcher(subject, queue, handler)
if queue
dispatcher.java_send(:subscribe,
[::Java::JavaLang::String, ::Java::JavaLang::String, ::Java::IoNatsClient::MessageHandler],
subject, queue, handler)
else
dispatcher.java_send(:subscribe,
[::Java::JavaLang::String, ::Java::IoNatsClient::MessageHandler],
subject, handler)
end
end
end
end
end
Loading