Skip to content

Commit 074f7e7

Browse files
committed
Use shared concurrency for performance / simplicity
Fixes #71
1 parent 19825e1 commit 074f7e7

File tree

4 files changed

+35
-9
lines changed

4 files changed

+35
-9
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.1.0
2+
_ Use shared concurrency / multiple channels for performance
3+
14
## 5.0.3
25
- Update gemspec summary
36

lib/logstash/outputs/rabbitmq.rb

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ class RabbitMQ < LogStash::Outputs::Base
1717
include LogStash::PluginMixins::RabbitMQConnection
1818

1919
config_name "rabbitmq"
20+
21+
concurrency :shared
2022

2123
# The default codec for this plugin is JSON. You can override this to suit your particular needs however.
2224
default :codec, "json"
@@ -44,22 +46,24 @@ class RabbitMQ < LogStash::Outputs::Base
4446
def register
4547
connect!
4648
@hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable)
47-
@codec.on_event(&method(:publish))
49+
# The connection close should close all channels, so it is safe to store thread locals here without closing them
50+
@thread_local_channel = java.lang.ThreadLocal.new
51+
@thread_local_exchange = java.lang.ThreadLocal.new
4852
end
4953

5054
def symbolize(myhash)
5155
Hash[myhash.map{|(k,v)| [k.to_sym,v]}]
5256
end
5357

54-
def receive(event)
55-
@codec.encode(event)
56-
rescue StandardError => e
57-
@logger.warn("Error encoding event", :exception => e, :event => event)
58+
def multi_receive_encoded(events_and_data)
59+
events_and_data.each do |event, data|
60+
publish(event, data)
61+
end
5862
end
5963

6064
def publish(event, message)
6165
raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange
62-
@hare_info.exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent)))
66+
local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent)))
6367
rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e
6468
@logger.error("Error while publishing. Will retry.",
6569
:message => e.message,
@@ -70,6 +74,24 @@ def publish(event, message)
7074
retry
7175
end
7276

77+
def local_exchange
78+
exchange = @thread_local_exchange.get
79+
if !exchange
80+
exchange = declare_exchange!(local_channel, @exchange, @exchange_type, @durable)
81+
@thread_local_exchange.set(exchange)
82+
end
83+
exchange
84+
end
85+
86+
def local_channel
87+
channel = @thread_local_channel.get
88+
if !channel
89+
channel = @hare_info.connection.create_channel
90+
@thread_local_channel.set(channel)
91+
end
92+
channel
93+
end
94+
7395
def close
7496
close_connection
7597
end

logstash-output-rabbitmq.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-rabbitmq'
3-
s.version = '5.0.3'
3+
s.version = '5.1.0'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Pushes events to a RabbitMQ exchange"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/outputs/rabbitmq_spec.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def spawn_and_wait(instance)
163163
sleep 1
164164
end
165165
let(:message) { LogStash::Event.new("message" => "Foo Message", "extra_field" => "Blah") }
166+
let(:encoded) { message.to_json }
166167
let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) }
167168
let(:test_channel) { test_connection.create_channel }
168169
let(:test_queue) {
@@ -196,7 +197,7 @@ def spawn_and_wait(instance)
196197
end
197198

198199
it 'applies per message settings' do
199-
instance.receive(message)
200+
instance.multi_receive_encoded([[message, encoded]])
200201
sleep 1.0
201202

202203
message, payload = test_queue.pop
@@ -213,7 +214,7 @@ def spawn_and_wait(instance)
213214
@received = payload
214215
end
215216

216-
instance.receive(message)
217+
instance.multi_receive_encoded([[message, encoded]])
217218

218219
until @received
219220
sleep 1

0 commit comments

Comments
 (0)