diff --git a/.travis.yml b/.travis.yml index ecacb07..a422932 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,12 @@ sudo: false language: ruby +jdk: openjdk8 rvm: - 2.3.0 - jruby-9.1.7.0 before_install: - - gem install bundler -v 1.14.3 + - gem install bundler - gem update --system + - wget https://github.com/nats-io/gnatsd/releases/download/v1.3.0/gnatsd-v1.3.0-linux-amd64.zip + - unzip gnatsd-v1.3.0-linux-amd64.zip + - ./gnatsd-v1.3.0-linux-amd64/gnatsd & diff --git a/ext/jars/jnats-1.1-SNAPSHOT.jar b/ext/jars/jnats-1.1-SNAPSHOT.jar deleted file mode 100644 index ddd22c5..0000000 Binary files a/ext/jars/jnats-1.1-SNAPSHOT.jar and /dev/null differ diff --git a/ext/jars/jnats-2.6.0.jar b/ext/jars/jnats-2.6.0.jar new file mode 100644 index 0000000..740d6c0 Binary files /dev/null and b/ext/jars/jnats-2.6.0.jar differ diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index 3ac33e4..08b8627 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -165,11 +165,11 @@ def send_request_through_nats end parse_response - rescue ::Protobuf::Nats::Errors::IOException => error + rescue ::Protobuf::Nats::Errors::IOException, ::Protobuf::Nats::Errors::IllegalStateException => error ::Protobuf::Nats.log_error(error) delay = reconnect_delay - logger.warn "An IOException was raised. We are going to sleep for #{delay} seconds." + logger.warn "An #{error.class} was raised. We are going to sleep for #{delay} seconds." sleep delay retry if (retries -= 1) > 0 @@ -207,7 +207,7 @@ def nats_request_with_two_responses(subject, data, opts) begin completed_request = false - if !sub_inbox.subscription.is_valid # replace the subscription if is has been pooled but is no longer valid (maybe a reconnect) + if !sub_inbox.subscription.is_active # replace the subscription if is has been pooled but is no longer valid (maybe a reconnect) nats.unsubscribe(sub_inbox.subscription) sub_inbox.swap(new_subscription_inbox) # this line replaces the sub_inbox in the connection pool if necessary end diff --git a/lib/protobuf/nats/errors.rb b/lib/protobuf/nats/errors.rb index 15ece99..d8cf82e 100644 --- a/lib/protobuf/nats/errors.rb +++ b/lib/protobuf/nats/errors.rb @@ -13,6 +13,15 @@ class ResponseTimeout < Base class MriIOException < ::StandardError end + class MriIllegalStateException < ::StandardError + end + + IllegalStateException = if defined? JRUBY_VERSION + java.lang.IllegalStateException + else + MriIllegalStateException + end + IOException = if defined? JRUBY_VERSION java.io.IOException else diff --git a/lib/protobuf/nats/jnats.rb b/lib/protobuf/nats/jnats.rb index 4f42bf3..0ab6aa8 100644 --- a/lib/protobuf/nats/jnats.rb +++ b/lib/protobuf/nats/jnats.rb @@ -3,17 +3,82 @@ require ::File.join(ext_base, "jars/slf4j-api-1.7.25.jar") require ::File.join(ext_base, "jars/slf4j-simple-1.7.25.jar") require ::File.join(ext_base, "jars/gson-2.6.2.jar") -require ::File.join(ext_base, "jars/jnats-1.1-SNAPSHOT.jar") - -# Set field accessors so we can access the member variables directly. -class Java::IoNatsClient::SubscriptionImpl - field_accessor :pMsgs, :pBytes, :delivered -end +require ::File.join(ext_base, "jars/jnats-2.6.0.jar") module Protobuf module Nats class JNats - attr_reader :connection, :options + attr_reader :connection, :dispatcher, :options + + class MessageHandlerProxy + include ::Java::IoNatsClient::MessageHandler + + def self.empty + new {} + end + + def initialize(&block) + @cb = block + end + + def onMessage(message) + @cb.call(message.getData.to_s, message.getReplyTo, message.getSubject) + end + end + + class ConnectionListener + include ::Java::IoNatsClient::ConnectionListener + + def initialize + @on_reconnect_cb = lambda {} + @on_disconnect_cb = lambda {} + @on_close_cb = lambda {} + end + + def on_close(&block); @on_close_cb = block; end + def on_disconnect(&block); @on_disconnect_cb = block; end + def on_reconnect(&block); @on_reconnect_cb = block; end + + def connectionEvent(conn, event_type) + case event_type + when ::Java::IoNatsClient::ConnectionListener::Events::RECONNECTED + @on_reconnect_cb.call + when ::Java::IoNatsClient::ConnectionListener::Events::DISCONNECTED + @on_disconnect_cb.call + when ::Java::IoNatsClient::ConnectionListener::Events::CLOSED + @on_close_cb.call + end + end + end + + class ErrorListener + include ::Java::IoNatsClient::ErrorListener + + def initialize + @on_error_cb = lambda { |_error| } + @on_exception_cb = lambda { |_exception| } + @on_slow_consumer_cb = lambda { |_consumer| } + end + + def on_error(&block) + return if block.nil? || block.arity != 1 + @on_error_cb = block + end + + def on_exception(&block) + return if block.nil? || block.arity != 1 + @on_exception_cb = block + end + + def on_slow_consumer(&block) + return if block.nil? || block.arity != 1 + @on_slow_consumer_cb = block + end + + def errorOccurred(_conn, error); @on_error_cb.call(error); end + def exceptionOccurred(_conn, exception); @on_exception_cb.call(exception); end + def slowConsumerDetected(_conn, consumer); @on_slow_consumer_cb.call(consumer); end + end class Message attr_reader :data, :subject, :reply @@ -26,10 +91,9 @@ def initialize(nats_message) end def initialize - @on_error_cb = lambda {|error|} - @on_reconnect_cb = lambda {} - @on_disconnect_cb = lambda {} - @on_close_cb = lambda {} + @connection_listener = ConnectionListener.new + @error_listener = ErrorListener.new + @options = nil @subz_cbs = {} @subz_mutex = ::Mutex.new @@ -39,35 +103,26 @@ def connect(options = {}) @options ||= options servers = options[:servers] || ["nats://localhost:4222"] - servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) } - connection_factory = ::Java::IoNatsClient::ConnectionFactory.new - connection_factory.setServers(servers) - connection_factory.setMaxReconnect(options[:max_reconnect_attempts]) + servers = [servers].flatten + + builder = ::Java::IoNatsClient::Options::Builder.new + builder.servers(servers) + builder.maxReconnects(options[:max_reconnect_attempts]) + builder.errorListener(@error_listener) # Shrink the pending buffer to always raise an error and let the caller retry. if options[:disable_reconnect_buffer] - connection_factory.setReconnectBufSize(1) + builder.reconnectBufferSize(1) end - # Setup callbacks - connection_factory.setDisconnectedCallback { |event| @on_disconnect_cb.call } - connection_factory.setReconnectedCallback { |_event| @on_reconnect_cb.call } - connection_factory.setClosedCallback { |_event| @on_close_cb.call } - connection_factory.setExceptionHandler { |error| @on_error_cb.call(error) } - # Setup ssl context if we're using tls if options[:uses_tls] ssl_context = create_ssl_context(options) - connection_factory.setSecure(true) - connection_factory.setSSLContext(ssl_context) + builder.sslContext(ssl_context) end - @connection = connection_factory.createConnection - - # We're going to spawn a consumer and supervisor - @work_queue = @connection.createMsgChannel - spwan_supervisor_and_consumer - + @connection = ::Java::IoNatsClient::Nats.connect(builder.build) + @dispatcher = @connection.createDispatcher(MessageHandlerProxy.empty) @connection end @@ -80,64 +135,56 @@ def connection # Do not depend on #close for a graceful disconnect. def close - @connection.close rescue nil + if @connection + @connection.closeDispatcher(@dispatcher) rescue nil + @connection.close rescue nil + end + @dispatcher = nil @connection = nil - @supervisor.kill rescue nil - @supervisor = nil - @consumer.kill rescue nil - @supervisor = nil end def flush(timeout_sec = 0.5) - connection.flush(timeout_sec * 1000) + duration = duration_in_ms(timeout_sec * 1000) + connection.flush(duration) end def next_message(sub, timeout_sec) - nats_message = sub.nextMessage(timeout_sec * 1000) + duration = duration_in_ms(timeout_sec * 1000) + nats_message = sub.nextMessage(duration) return nil unless nats_message Message.new(nats_message) end def publish(subject, data, mailbox = nil) # The "true" here is to force flush. May not need this. - connection.publish(subject, mailbox, data.to_java_bytes, true) + connection.publish(subject, mailbox, data.to_java_bytes) + connection.flush(nil) end def subscribe(subject, options = {}, &block) queue = options[:queue] max = options[:max] - work_queue = nil - # We pass our work queue for processing async work because java nats - # uses a cahced thread pool: 1 thread per async subscription. - # Sync subs need their own queue so work is not processed async. - work_queue = block.nil? ? connection.createMsgChannel : @work_queue - sub = connection.subscribe(subject, queue, nil, work_queue) - - # Register the block callback. We only lock to save the callback. + if block - @subz_mutex.synchronize do - @subz_cbs[sub.getSid] = block - end + handler = MessageHandlerProxy.new(&block) + sub = subscribe_using_subscription_dispatcher(subject, queue, handler) + # Auto unsub if max message option was provided. + dispatcher.unsubscribe(sub, max) if max + sub + else + sub = subscribe_using_connection(subject, queue) + sub.unsubscribe(max) if max + sub end - - # Auto unsub if max message option was provided. - sub.autoUnsubscribe(max) if max - - sub end def unsubscribe(sub) return if sub.nil? - - # Cleanup our async callback - if @subz_cbs[sub.getSid] - @subz_mutex.synchronize do - @subz_cbs.delete(sub.getSid) - end + if sub.getDispatcher + dispatcher.unsubscribe(sub) + else + sub.unsubscribe() end - - # The "true" here is to ignore and invalid conn. - sub.unsubscribe(true) end def new_inbox @@ -145,69 +192,23 @@ def new_inbox end def on_reconnect(&cb) - @on_reconnect_cb = cb + @connection_listener.on_reconnect(&cb) end def on_disconnect(&cb) - @on_disconnect_cb = cb + @connection_listener.on_disconnect(&cb) end def on_error(&cb) - @on_error_cb = cb + @error_listener.on_exception(&cb) end def on_close(&cb) - @on_close_cb = cb + @connection_listener.on_close(&cb) end private - def spwan_supervisor_and_consumer - spawn_consumer - @supervisor = ::Thread.new do - loop do - begin - sleep 1 - next if @consumer && @consumer.alive? - # We need to recreate the consumer thread - @consumer.kill if @consumer - spawn_consumer - rescue => error - @on_error_cb.call(error) - end - end - end - end - - def spawn_consumer - @consumer = ::Thread.new do - loop do - begin - message = @work_queue.take - next unless message - sub = message.getSubscription - - # We have to update the subscription stats so we're not considered a slow consumer. - begin - sub.lock - sub.pMsgs -= 1 - sub.pBytes -= message.getData.length if message.getData - sub.delivered += 1 unless sub.isClosed - ensure - sub.unlock - end - - # We don't need t - callback = @subz_cbs[sub.getSid] - next unless callback - callback.call(message.getData.to_s, message.getReplyTo, message.getSubject) - rescue => error - @on_error_cb.call(error) - end - end - end - end - # Jruby-openssl depends on bouncycastle so our lives don't suck super bad def read_pem_object_from_file(path) fail ::ArgumentError, "Tried to read a PEM key or cert with path nil" if path.nil? @@ -251,6 +252,30 @@ def create_ssl_context(options) context.init(key_manager.getKeyManagers, trust_manager.getTrustManagers, nil) context end + + def duration_in_ms(ms) + ::Java::JavaTime::Duration.ofMillis(ms) + end + + def subscribe_using_connection(subject, queue) + if queue + connection.subscribe(subject, queue) + else + connection.subscribe(subject) + end + end + + def subscribe_using_subscription_dispatcher(subject, queue, handler) + if queue + dispatcher.java_send(:subscribe, + [::Java::JavaLang::String, ::Java::JavaLang::String, ::Java::IoNatsClient::MessageHandler], + subject, queue, handler) + else + dispatcher.java_send(:subscribe, + [::Java::JavaLang::String, ::Java::IoNatsClient::MessageHandler], + subject, handler) + end + end end end end diff --git a/protobuf-nats.gemspec b/protobuf-nats.gemspec index 7a72871..8d63d11 100644 --- a/protobuf-nats.gemspec +++ b/protobuf-nats.gemspec @@ -35,7 +35,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "protobuf", "~> 3.7", ">= 3.7.2" spec.add_runtime_dependency "nats-pure", "~> 0.3", "< 0.4" - spec.add_development_dependency "bundler", "~> 1.14" + spec.add_development_dependency "bundler" spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec" spec.add_development_dependency "benchmark-ips" diff --git a/spec/protobuf/nats/client_spec.rb b/spec/protobuf/nats/client_spec.rb index 4bf6e26..2ab9fa4 100644 --- a/spec/protobuf/nats/client_spec.rb +++ b/spec/protobuf/nats/client_spec.rb @@ -112,7 +112,7 @@ class ExampleServiceClass; end let(:ack) { ::Protobuf::Nats::Messages::ACK } let(:nack) { ::Protobuf::Nats::Messages::NACK } let(:response) { "final count down" } - let(:subscription_inbox) { ::Protobuf::Nats::Client::SubscriptionInbox.new(double("sub", :is_valid => true), "INBOX") } + let(:subscription_inbox) { ::Protobuf::Nats::Client::SubscriptionInbox.new(double("sub", :is_active => true), "INBOX") } before do allow(::Protobuf::Nats).to receive(:client_nats_connection).and_return(client) @@ -158,7 +158,7 @@ class ExampleServiceClass; end end describe "#send_request" do - let(:subscription_inbox) { ::Protobuf::Nats::Client::SubscriptionInbox.new(double("sub", :is_valid => true), "INBOX") } + let(:subscription_inbox) { ::Protobuf::Nats::Client::SubscriptionInbox.new(double("sub", :is_active => true), "INBOX") } before do allow_any_instance_of(::Protobuf::Nats::Client).to receive(:new_subscription_inbox).and_return(subscription_inbox) diff --git a/spec/protobuf/nats/jnats_spec.rb b/spec/protobuf/nats/jnats_spec.rb index 9d6b31d..b664bc4 100644 --- a/spec/protobuf/nats/jnats_spec.rb +++ b/spec/protobuf/nats/jnats_spec.rb @@ -4,6 +4,63 @@ require "protobuf/nats/jnats" describe ::Protobuf::Nats::JNats do + describe "#subscribe and #publish" do + before { subject.connection } + after { subject.close } + + it "can async subscribe multiple times" do + times_received = 0 + lock = Mutex.new + subject.subscribe("yolo.123") do + lock.synchronize { times_received += 1 } + end + subject.publish("yolo.123", "test") + verify_expectation_within(1) do + expect(times_received).to eq(1) + end + end + + it "can sync subscribe" do + expected_data = ::SecureRandom.uuid + sub = subject.subscribe("yolo.345") + subject.publish("yolo.345", expected_data) + msg = subject.next_message(sub, 0.1) + expect(msg.data).to eq(expected_data) + end + end + + describe "#next_message" do + before { subject.connection } + after { subject.close } + + it "returns nil when a timeout has expired" do + sub = subject.subscribe("yolo.345") + msg = subject.next_message(sub, 0.1) + expect(msg).to be_nil + end + end + + describe "#unsubscribe" do + before { subject.connection } + after { subject.close } + + it "can unsub from an async subscription" do + # This verifies the dispatcher is called correctly. + async_sub = subject.subscribe("yolo.abc.async") {} + expect(async_sub.is_active).to eq(true) + subject.unsubscribe(async_sub) + expect(async_sub.is_active).to eq(false) + end + + it "can unsub from a sync subscription" do + # This verifies the dispatcher is NOT called. + sub = subject.subscribe("yolo.abc.not_sync") + expect(sub.is_active).to eq(true) + subject.unsubscribe(sub) + expect(sub.is_active).to eq(false) + end + end + describe "#connection" do it "calls #connect when no @connection exists" do expect(subject).to receive(:connect).with({}) @@ -11,7 +68,7 @@ end it "attempts to reconnect with options given to #connect" do - allow(::Java::IoNatsClient::ConnectionFactory).to receive(:new).and_raise(::RuntimeError) + allow(::Java::IoNatsClient::Nats).to receive(:connect).and_raise(::RuntimeError) provided_options = {:yolo => "ok"} subject.connect(provided_options) rescue nil expect(subject.options).to eq(provided_options) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6a0f2e3..b38a125 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -20,3 +20,17 @@ allow(Protobuf::Nats).to receive(:start_client_nats_connection) end end + +def verify_expectation_within(number_of_seconds, check_every = 0.02) + waiting_since = ::Time.now + begin + sleep check_every + yield + rescue RSpec::Expectations::ExpectationNotMetError => e + if ::Time.now - waiting_since > number_of_seconds + raise e + else + retry + end + end +end