From 40f7cd3e4f3a9df23b909a8c5d562e0572008707 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Tue, 7 Nov 2017 15:52:29 -0500 Subject: [PATCH 1/7] Test the client nats connection before sending a request --- lib/protobuf/nats/client.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index d5cd1a0..3ac33e4 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -126,6 +126,9 @@ def use_subscription_pooling? end def send_request + # This will ensure the client is started. + ::Protobuf::Nats.start_client_nats_connection + if use_subscription_pooling? available = self.class.subscription_pool.instance_variable_get("@available") ::ActiveSupport::Notifications.instrument "client.subscription_pool_available_size.protobuf-nats", available.length From a7b73323a530725f1b1ebc7cd9e13bda146d8f7f Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Tue, 7 Nov 2017 15:52:52 -0500 Subject: [PATCH 2/7] Only store the client nats connection if everything loaded correctly --- lib/protobuf/nats.rb | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/lib/protobuf/nats.rb b/lib/protobuf/nats.rb index 76d72ed..76e46ff 100644 --- a/lib/protobuf/nats.rb +++ b/lib/protobuf/nats.rb @@ -86,33 +86,34 @@ def self.start_client_nats_connection # Disable publisher pending buffer on reconnect options = config.connection_options.merge(:disable_reconnect_buffer => true) + client = NatsClient.new begin - @client_nats_connection = NatsClient.new - @client_nats_connection.connect(options) + client.connect(options) rescue ::Protobuf::Nats::Errors::IOException - @client_nats_connection = nil raise end # Ensure we have a valid connection to the NATS server. - @client_nats_connection.flush(5) + client.flush(5) - @client_nats_connection.on_disconnect do + client.on_disconnect do logger.warn("Client NATS connection was disconnected") end - @client_nats_connection.on_reconnect do + client.on_reconnect do logger.warn("Client NATS connection was reconnected") end - @client_nats_connection.on_close do + client.on_close do logger.warn("Client NATS connection was closed") end - @client_nats_connection.on_error do |error| + client.on_error do |error| notify_error_callbacks(error) end + @client_nats_connection = client + true end end From 1d5d7ca98127e510ea725984789f0d760995df4f Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Tue, 7 Nov 2017 15:54:39 -0500 Subject: [PATCH 3/7] Ensure the start client nats connection memo checks for an instance of a nats connection --- lib/protobuf/nats.rb | 58 ++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/lib/protobuf/nats.rb b/lib/protobuf/nats.rb index 76e46ff..895b236 100644 --- a/lib/protobuf/nats.rb +++ b/lib/protobuf/nats.rb @@ -78,44 +78,44 @@ def self.subscription_key(service_klass, service_method) end def self.start_client_nats_connection - @start_client_nats_connection ||= begin - GET_CONNECTED_MUTEX.synchronize do - break true if @client_nats_connection - break true if @start_client_nats_connection + return true if @start_client_nats_connection && @client_nats_connection - # Disable publisher pending buffer on reconnect - options = config.connection_options.merge(:disable_reconnect_buffer => true) + GET_CONNECTED_MUTEX.synchronize do + break true if @client_nats_connection + break true if @start_client_nats_connection - client = NatsClient.new - begin - client.connect(options) - rescue ::Protobuf::Nats::Errors::IOException - raise - end + # Disable publisher pending buffer on reconnect + options = config.connection_options.merge(:disable_reconnect_buffer => true) - # Ensure we have a valid connection to the NATS server. - client.flush(5) - - client.on_disconnect do - logger.warn("Client NATS connection was disconnected") - end + client = NatsClient.new + begin + client.connect(options) + rescue ::Protobuf::Nats::Errors::IOException + raise + end - client.on_reconnect do - logger.warn("Client NATS connection was reconnected") - end + # Ensure we have a valid connection to the NATS server. + client.flush(5) - client.on_close do - logger.warn("Client NATS connection was closed") - end + client.on_disconnect do + logger.warn("Client NATS connection was disconnected") + end - client.on_error do |error| - notify_error_callbacks(error) - end + client.on_reconnect do + logger.warn("Client NATS connection was reconnected") + end - @client_nats_connection = client + client.on_close do + logger.warn("Client NATS connection was closed") + end - true + client.on_error do |error| + notify_error_callbacks(error) end + + @client_nats_connection = client + + true end end From 4ec7fd787b6653bf95ed0ab752175e7f035e3c30 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Tue, 7 Nov 2017 15:55:14 -0500 Subject: [PATCH 4/7] Allow an auto-connect to happen if the @connection is nil --- lib/protobuf/nats/jnats.rb | 22 +++++++++++++++------- spec/protobuf/nats/jnats_spec.rb | 24 ++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) create mode 100644 spec/protobuf/nats/jnats_spec.rb diff --git a/lib/protobuf/nats/jnats.rb b/lib/protobuf/nats/jnats.rb index e7dfad5..78b1116 100644 --- a/lib/protobuf/nats/jnats.rb +++ b/lib/protobuf/nats/jnats.rb @@ -13,7 +13,7 @@ class Java::IoNatsClient::SubscriptionImpl module Protobuf module Nats class JNats - attr_reader :connection + attr_reader :connection, :options class Message attr_reader :data, :subject, :reply @@ -30,11 +30,14 @@ def initialize @on_reconnect_cb = lambda {} @on_disconnect_cb = lambda {} @on_close_cb = lambda {} + @options = nil @subz_cbs = {} @subz_mutex = ::Mutex.new end def connect(options = {}) + @options ||= options + servers = options[:servers] || ["nats://localhost:4222"] servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) } connection_factory = ::Java::IoNatsClient::ConnectionFactory.new @@ -68,9 +71,14 @@ def connect(options = {}) @connection end - # Do not depend on #close for a greaceful disconnect. + def connection + return @connection unless @connection.nil? + connect(options || {}) + end + + # Do not depend on #close for a graceful disconnect. def close - @connection.close + @connection.close rescue nil @connection = nil @supervisor.kill rescue nil @supervisor = nil @@ -79,7 +87,7 @@ def close end def flush(timeout_sec = 0.5) - @connection.flush(timeout_sec * 1000) + connection.flush(timeout_sec * 1000) end def next_message(sub, timeout_sec) @@ -90,7 +98,7 @@ def next_message(sub, timeout_sec) def publish(subject, data, mailbox = nil) # The "true" here is to force flush. May not need this. - @connection.publish(subject, mailbox, data.to_java_bytes, true) + connection.publish(subject, mailbox, data.to_java_bytes, true) end def subscribe(subject, options = {}, &block) @@ -100,8 +108,8 @@ def subscribe(subject, options = {}, &block) # We pass our work queue for processing async work because java nats # uses a cahced thread pool: 1 thread per async subscription. # Sync subs need their own queue so work is not processed async. - work_queue = block.nil? ? @connection.createMsgChannel : @work_queue - sub = @connection.subscribe(subject, queue, nil, work_queue) + work_queue = block.nil? ? connection.createMsgChannel : @work_queue + sub = connection.subscribe(subject, queue, nil, work_queue) # Register the block callback. We only lock to save the callback. if block diff --git a/spec/protobuf/nats/jnats_spec.rb b/spec/protobuf/nats/jnats_spec.rb new file mode 100644 index 0000000..9d6b31d --- /dev/null +++ b/spec/protobuf/nats/jnats_spec.rb @@ -0,0 +1,24 @@ +require "rspec" + +if defined?(JRUBY_VERSION) + require "protobuf/nats/jnats" + + describe ::Protobuf::Nats::JNats do + describe "#connection" do + it "calls #connect when no @connection exists" do + expect(subject).to receive(:connect).with({}) + subject.connection + end + + it "attempts to reconnect with options given to #connect" do + allow(::Java::IoNatsClient::ConnectionFactory).to receive(:new).and_raise(::RuntimeError) + provided_options = {:yolo => "ok"} + subject.connect(provided_options) rescue nil + expect(subject.options).to eq(provided_options) + + expect(subject).to receive(:connect).with(provided_options) + subject.connection rescue nil + end + end + end +end From b79b11596937e1cd179e71e8e61f741afab87f4b Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Tue, 7 Nov 2017 16:04:16 -0500 Subject: [PATCH 5/7] Formatting --- lib/protobuf/nats/jnats.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/protobuf/nats/jnats.rb b/lib/protobuf/nats/jnats.rb index 78b1116..58bd04b 100644 --- a/lib/protobuf/nats/jnats.rb +++ b/lib/protobuf/nats/jnats.rb @@ -87,7 +87,7 @@ def close end def flush(timeout_sec = 0.5) - connection.flush(timeout_sec * 1000) + connection.flush(timeout_sec * 1000) end def next_message(sub, timeout_sec) From d3b63c295d27feac5b627ca210da8743d911a040 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Tue, 7 Nov 2017 16:05:06 -0500 Subject: [PATCH 6/7] Remove begin/resuce/end block now that we don't need to change state on resuce --- lib/protobuf/nats.rb | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lib/protobuf/nats.rb b/lib/protobuf/nats.rb index 895b236..78ca3d6 100644 --- a/lib/protobuf/nats.rb +++ b/lib/protobuf/nats.rb @@ -88,11 +88,7 @@ def self.start_client_nats_connection options = config.connection_options.merge(:disable_reconnect_buffer => true) client = NatsClient.new - begin - client.connect(options) - rescue ::Protobuf::Nats::Errors::IOException - raise - end + client.connect(options) # Ensure we have a valid connection to the NATS server. client.flush(5) From e3231aa0fb98e1827684df0cebb6be12e4e8a1a1 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Tue, 7 Nov 2017 16:34:17 -0500 Subject: [PATCH 7/7] Cleanup before attempting to restart --- lib/protobuf/nats/jnats.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/protobuf/nats/jnats.rb b/lib/protobuf/nats/jnats.rb index 58bd04b..4f42bf3 100644 --- a/lib/protobuf/nats/jnats.rb +++ b/lib/protobuf/nats/jnats.rb @@ -73,6 +73,8 @@ def connect(options = {}) def connection return @connection unless @connection.nil? + # Ensure no consumer or supervisor are running + close connect(options || {}) end