diff --git a/CHANGELOG.md b/CHANGELOG.md index 4224468..b2b8749 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.1.0 + _ Use shared concurrency / multiple channels for performance + ## 5.0.3 - Update gemspec summary diff --git a/lib/logstash/outputs/rabbitmq.rb b/lib/logstash/outputs/rabbitmq.rb index 6c660e8..36224ae 100644 --- a/lib/logstash/outputs/rabbitmq.rb +++ b/lib/logstash/outputs/rabbitmq.rb @@ -17,6 +17,8 @@ class RabbitMQ < LogStash::Outputs::Base include LogStash::PluginMixins::RabbitMQConnection config_name "rabbitmq" + + concurrency :shared # The default codec for this plugin is JSON. You can override this to suit your particular needs however. default :codec, "json" @@ -44,22 +46,24 @@ class RabbitMQ < LogStash::Outputs::Base def register connect! @hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable) - @codec.on_event(&method(:publish)) + # The connection close should close all channels, so it is safe to store thread locals here without closing them + @thread_local_channel = java.lang.ThreadLocal.new + @thread_local_exchange = java.lang.ThreadLocal.new end def symbolize(myhash) Hash[myhash.map{|(k,v)| [k.to_sym,v]}] end - def receive(event) - @codec.encode(event) - rescue StandardError => e - @logger.warn("Error encoding event", :exception => e, :event => event) + def multi_receive_encoded(events_and_data) + events_and_data.each do |event, data| + publish(event, data) + end end def publish(event, message) raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange - @hare_info.exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent))) + local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent))) rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e @logger.error("Error while publishing. Will retry.", :message => e.message, @@ -70,6 +74,24 @@ def publish(event, message) retry end + def local_exchange + exchange = @thread_local_exchange.get + if !exchange + exchange = declare_exchange!(local_channel, @exchange, @exchange_type, @durable) + @thread_local_exchange.set(exchange) + end + exchange + end + + def local_channel + channel = @thread_local_channel.get + if !channel + channel = @hare_info.connection.create_channel + @thread_local_channel.set(channel) + end + channel + end + def close close_connection end diff --git a/logstash-output-rabbitmq.gemspec b/logstash-output-rabbitmq.gemspec index 2ebb607..51506ee 100644 --- a/logstash-output-rabbitmq.gemspec +++ b/logstash-output-rabbitmq.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-rabbitmq' - s.version = '5.0.3' + s.version = '5.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "Pushes events to a RabbitMQ exchange" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/outputs/rabbitmq_spec.rb b/spec/outputs/rabbitmq_spec.rb index 4910269..d864adb 100644 --- a/spec/outputs/rabbitmq_spec.rb +++ b/spec/outputs/rabbitmq_spec.rb @@ -163,6 +163,7 @@ def spawn_and_wait(instance) sleep 1 end let(:message) { LogStash::Event.new("message" => "Foo Message", "extra_field" => "Blah") } + let(:encoded) { message.to_json } let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) } let(:test_channel) { test_connection.create_channel } let(:test_queue) { @@ -196,7 +197,7 @@ def spawn_and_wait(instance) end it 'applies per message settings' do - instance.receive(message) + instance.multi_receive_encoded([[message, encoded]]) sleep 1.0 message, payload = test_queue.pop @@ -213,7 +214,7 @@ def spawn_and_wait(instance) @received = payload end - instance.receive(message) + instance.multi_receive_encoded([[message, encoded]]) until @received sleep 1