diff --git a/lib/protobuf/nats.rb b/lib/protobuf/nats.rb index 76d72ed..78ca3d6 100644 --- a/lib/protobuf/nats.rb +++ b/lib/protobuf/nats.rb @@ -78,43 +78,40 @@ def self.subscription_key(service_klass, service_method) end def self.start_client_nats_connection - @start_client_nats_connection ||= begin - GET_CONNECTED_MUTEX.synchronize do - break true if @client_nats_connection - break true if @start_client_nats_connection - - # Disable publisher pending buffer on reconnect - options = config.connection_options.merge(:disable_reconnect_buffer => true) - - begin - @client_nats_connection = NatsClient.new - @client_nats_connection.connect(options) - rescue ::Protobuf::Nats::Errors::IOException - @client_nats_connection = nil - raise - end - - # Ensure we have a valid connection to the NATS server. - @client_nats_connection.flush(5) - - @client_nats_connection.on_disconnect do - logger.warn("Client NATS connection was disconnected") - end - - @client_nats_connection.on_reconnect do - logger.warn("Client NATS connection was reconnected") - end - - @client_nats_connection.on_close do - logger.warn("Client NATS connection was closed") - end - - @client_nats_connection.on_error do |error| - notify_error_callbacks(error) - end - - true + return true if @start_client_nats_connection && @client_nats_connection + + GET_CONNECTED_MUTEX.synchronize do + break true if @client_nats_connection + break true if @start_client_nats_connection + + # Disable publisher pending buffer on reconnect + options = config.connection_options.merge(:disable_reconnect_buffer => true) + + client = NatsClient.new + client.connect(options) + + # Ensure we have a valid connection to the NATS server. + client.flush(5) + + client.on_disconnect do + logger.warn("Client NATS connection was disconnected") + end + + client.on_reconnect do + logger.warn("Client NATS connection was reconnected") end + + client.on_close do + logger.warn("Client NATS connection was closed") + end + + client.on_error do |error| + notify_error_callbacks(error) + end + + @client_nats_connection = client + + true end end diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index d5cd1a0..3ac33e4 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -126,6 +126,9 @@ def use_subscription_pooling? end def send_request + # This will ensure the client is started. + ::Protobuf::Nats.start_client_nats_connection + if use_subscription_pooling? available = self.class.subscription_pool.instance_variable_get("@available") ::ActiveSupport::Notifications.instrument "client.subscription_pool_available_size.protobuf-nats", available.length diff --git a/lib/protobuf/nats/jnats.rb b/lib/protobuf/nats/jnats.rb index e7dfad5..4f42bf3 100644 --- a/lib/protobuf/nats/jnats.rb +++ b/lib/protobuf/nats/jnats.rb @@ -13,7 +13,7 @@ class Java::IoNatsClient::SubscriptionImpl module Protobuf module Nats class JNats - attr_reader :connection + attr_reader :connection, :options class Message attr_reader :data, :subject, :reply @@ -30,11 +30,14 @@ def initialize @on_reconnect_cb = lambda {} @on_disconnect_cb = lambda {} @on_close_cb = lambda {} + @options = nil @subz_cbs = {} @subz_mutex = ::Mutex.new end def connect(options = {}) + @options ||= options + servers = options[:servers] || ["nats://localhost:4222"] servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) } connection_factory = ::Java::IoNatsClient::ConnectionFactory.new @@ -68,9 +71,16 @@ def connect(options = {}) @connection end - # Do not depend on #close for a greaceful disconnect. + def connection + return @connection unless @connection.nil? + # Ensure no consumer or supervisor are running + close + connect(options || {}) + end + + # Do not depend on #close for a graceful disconnect. def close - @connection.close + @connection.close rescue nil @connection = nil @supervisor.kill rescue nil @supervisor = nil @@ -79,7 +89,7 @@ def close end def flush(timeout_sec = 0.5) - @connection.flush(timeout_sec * 1000) + connection.flush(timeout_sec * 1000) end def next_message(sub, timeout_sec) @@ -90,7 +100,7 @@ def next_message(sub, timeout_sec) def publish(subject, data, mailbox = nil) # The "true" here is to force flush. May not need this. - @connection.publish(subject, mailbox, data.to_java_bytes, true) + connection.publish(subject, mailbox, data.to_java_bytes, true) end def subscribe(subject, options = {}, &block) @@ -100,8 +110,8 @@ def subscribe(subject, options = {}, &block) # We pass our work queue for processing async work because java nats # uses a cahced thread pool: 1 thread per async subscription. # Sync subs need their own queue so work is not processed async. - work_queue = block.nil? ? @connection.createMsgChannel : @work_queue - sub = @connection.subscribe(subject, queue, nil, work_queue) + work_queue = block.nil? ? connection.createMsgChannel : @work_queue + sub = connection.subscribe(subject, queue, nil, work_queue) # Register the block callback. We only lock to save the callback. if block diff --git a/spec/protobuf/nats/jnats_spec.rb b/spec/protobuf/nats/jnats_spec.rb new file mode 100644 index 0000000..9d6b31d --- /dev/null +++ b/spec/protobuf/nats/jnats_spec.rb @@ -0,0 +1,24 @@ +require "rspec" + +if defined?(JRUBY_VERSION) + require "protobuf/nats/jnats" + + describe ::Protobuf::Nats::JNats do + describe "#connection" do + it "calls #connect when no @connection exists" do + expect(subject).to receive(:connect).with({}) + subject.connection + end + + it "attempts to reconnect with options given to #connect" do + allow(::Java::IoNatsClient::ConnectionFactory).to receive(:new).and_raise(::RuntimeError) + provided_options = {:yolo => "ok"} + subject.connect(provided_options) rescue nil + expect(subject.options).to eq(provided_options) + + expect(subject).to receive(:connect).with(provided_options) + subject.connection rescue nil + end + end + end +end