Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dealing with createMsgChannel errors #35

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 33 additions & 36 deletions lib/protobuf/nats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,43 +78,40 @@ def self.subscription_key(service_klass, service_method)
end

def self.start_client_nats_connection
@start_client_nats_connection ||= begin
GET_CONNECTED_MUTEX.synchronize do
break true if @client_nats_connection
break true if @start_client_nats_connection

# Disable publisher pending buffer on reconnect
options = config.connection_options.merge(:disable_reconnect_buffer => true)

begin
@client_nats_connection = NatsClient.new
@client_nats_connection.connect(options)
rescue ::Protobuf::Nats::Errors::IOException
@client_nats_connection = nil
raise
end

# Ensure we have a valid connection to the NATS server.
@client_nats_connection.flush(5)

@client_nats_connection.on_disconnect do
logger.warn("Client NATS connection was disconnected")
end

@client_nats_connection.on_reconnect do
logger.warn("Client NATS connection was reconnected")
end

@client_nats_connection.on_close do
logger.warn("Client NATS connection was closed")
end

@client_nats_connection.on_error do |error|
notify_error_callbacks(error)
end

true
return true if @start_client_nats_connection && @client_nats_connection

GET_CONNECTED_MUTEX.synchronize do
break true if @client_nats_connection
break true if @start_client_nats_connection

# Disable publisher pending buffer on reconnect
options = config.connection_options.merge(:disable_reconnect_buffer => true)

client = NatsClient.new
client.connect(options)

# Ensure we have a valid connection to the NATS server.
client.flush(5)

client.on_disconnect do
logger.warn("Client NATS connection was disconnected")
end

client.on_reconnect do
logger.warn("Client NATS connection was reconnected")
end

client.on_close do
logger.warn("Client NATS connection was closed")
end

client.on_error do |error|
notify_error_callbacks(error)
end

@client_nats_connection = client

true
end
end

Expand Down
3 changes: 3 additions & 0 deletions lib/protobuf/nats/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def use_subscription_pooling?
end

def send_request
# This will ensure the client is started.
::Protobuf::Nats.start_client_nats_connection

if use_subscription_pooling?
available = self.class.subscription_pool.instance_variable_get("@available")
::ActiveSupport::Notifications.instrument "client.subscription_pool_available_size.protobuf-nats", available.length
Expand Down
24 changes: 17 additions & 7 deletions lib/protobuf/nats/jnats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Java::IoNatsClient::SubscriptionImpl
module Protobuf
module Nats
class JNats
attr_reader :connection
attr_reader :connection, :options

class Message
attr_reader :data, :subject, :reply
Expand All @@ -30,11 +30,14 @@ def initialize
@on_reconnect_cb = lambda {}
@on_disconnect_cb = lambda {}
@on_close_cb = lambda {}
@options = nil
@subz_cbs = {}
@subz_mutex = ::Mutex.new
end

def connect(options = {})
@options ||= options

servers = options[:servers] || ["nats://localhost:4222"]
servers = [servers].flatten.map { |uri_string| java.net.URI.new(uri_string) }
connection_factory = ::Java::IoNatsClient::ConnectionFactory.new
Expand Down Expand Up @@ -68,9 +71,16 @@ def connect(options = {})
@connection
end

# Do not depend on #close for a greaceful disconnect.
def connection
return @connection unless @connection.nil?
# Ensure no consumer or supervisor are running
close
connect(options || {})
end

# Do not depend on #close for a graceful disconnect.
def close
@connection.close
@connection.close rescue nil
@connection = nil
@supervisor.kill rescue nil
@supervisor = nil
Expand All @@ -79,7 +89,7 @@ def close
end

def flush(timeout_sec = 0.5)
@connection.flush(timeout_sec * 1000)
connection.flush(timeout_sec * 1000)
end

def next_message(sub, timeout_sec)
Expand All @@ -90,7 +100,7 @@ def next_message(sub, timeout_sec)

def publish(subject, data, mailbox = nil)
# The "true" here is to force flush. May not need this.
@connection.publish(subject, mailbox, data.to_java_bytes, true)
connection.publish(subject, mailbox, data.to_java_bytes, true)
end

def subscribe(subject, options = {}, &block)
Expand All @@ -100,8 +110,8 @@ def subscribe(subject, options = {}, &block)
# We pass our work queue for processing async work because java nats
# uses a cahced thread pool: 1 thread per async subscription.
# Sync subs need their own queue so work is not processed async.
work_queue = block.nil? ? @connection.createMsgChannel : @work_queue
sub = @connection.subscribe(subject, queue, nil, work_queue)
work_queue = block.nil? ? connection.createMsgChannel : @work_queue
sub = connection.subscribe(subject, queue, nil, work_queue)

# Register the block callback. We only lock to save the callback.
if block
Expand Down
24 changes: 24 additions & 0 deletions spec/protobuf/nats/jnats_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
require "rspec"

if defined?(JRUBY_VERSION)
require "protobuf/nats/jnats"

describe ::Protobuf::Nats::JNats do
describe "#connection" do
it "calls #connect when no @connection exists" do
expect(subject).to receive(:connect).with({})
subject.connection
end

it "attempts to reconnect with options given to #connect" do
allow(::Java::IoNatsClient::ConnectionFactory).to receive(:new).and_raise(::RuntimeError)
provided_options = {:yolo => "ok"}
subject.connect(provided_options) rescue nil
expect(subject.options).to eq(provided_options)

expect(subject).to receive(:connect).with(provided_options)
subject.connection rescue nil
end
end
end
end