From 36b674c9bd5f9fd0d9633b21f519802dc79a63f6 Mon Sep 17 00:00:00 2001 From: Pierre Marot Date: Thu, 29 Nov 2018 13:46:17 +0100 Subject: [PATCH 1/3] add support for variable in message properties and message headers --- examples/sample.conf | 11 +++++++++++ lib/logstash/outputs/rabbitmq.rb | 9 ++++++--- spec/outputs/rabbitmq_spec.rb | 21 +++++++++++++++------ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/examples/sample.conf b/examples/sample.conf index 16f781a..79f0177 100644 --- a/examples/sample.conf +++ b/examples/sample.conf @@ -2,10 +2,21 @@ input { stdin {} } +filter { + mutate { + add_field => {"[@metadata][eventType]" => "MessageType"} + } +} + output { rabbitmq { host => "127.0.0.1" exchange => "foo" exchange_type => "topic" + message_properties => { + "content_encoding" => "utf8" + "content_type" => "application/json" + "type" => "%{[@metadata][eventType]}" + } } } \ No newline at end of file diff --git a/lib/logstash/outputs/rabbitmq.rb b/lib/logstash/outputs/rabbitmq.rb index 36224ae..b3377ad 100644 --- a/lib/logstash/outputs/rabbitmq.rb +++ b/lib/logstash/outputs/rabbitmq.rb @@ -51,8 +51,9 @@ def register @thread_local_exchange = java.lang.ThreadLocal.new end - def symbolize(myhash) - Hash[myhash.map{|(k,v)| [k.to_sym,v]}] + def symbolize(myhash, event) + Hash[myhash.map{|(k,v)| [k.to_sym, if v.is_a? String then event.sprintf(v) + elsif v.is_a? Hash then symbolize(v, event) else v end]}] end def multi_receive_encoded(events_and_data) @@ -63,7 +64,9 @@ def multi_receive_encoded(events_and_data) def publish(event, message) raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange - local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent))) + local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge( + :persistent => @persistent + ), event)) rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e @logger.error("Error while publishing. Will retry.", :message => e.message, diff --git a/spec/outputs/rabbitmq_spec.rb b/spec/outputs/rabbitmq_spec.rb index d864adb..2527bf2 100644 --- a/spec/outputs/rabbitmq_spec.rb +++ b/spec/outputs/rabbitmq_spec.rb @@ -12,6 +12,11 @@ let(:exchange) { "myexchange" } let(:key) { "mykey" } let(:persistent) { true } + let(:event_type) { "MessageType" } + let(:var_event_type) { "%{[@metadata][event_type]}" } + let(:event_metadata) { + {"event_type" => event_type } + } let(:rabbitmq_settings) { { "host" => host, @@ -19,7 +24,10 @@ "exchange_type" => exchange_type, "exchange" => exchange, "key" => key, - "persistent" => persistent + "persistent" => persistent, + "message_properties" => { + "type" => var_event_type + } } } let(:instance) { klass.new(rabbitmq_settings) } @@ -39,7 +47,7 @@ end it 'should send the correct metadata (twice)' do - expected_metadata = {:routing_key => event.sprintf(key), :properties => {:persistent => persistent }} + expected_metadata = {:routing_key => event.sprintf(key), :properties => {:persistent => persistent, :type => event_type }} expect(exchange).to have_received(:publish).with(anything, expected_metadata).twice end end @@ -74,7 +82,7 @@ end describe "#publish_encoded" do - let(:event) { LogStash::Event.new("foo" => "bar") } + let(:event) { LogStash::Event.new("foo" => "bar", "@metadata" => {"event_type" => event_type }) } let(:sprinted_key) { double("sprinted key") } let(:encoded_event) { LogStash::Json.dump(event) } @@ -82,6 +90,7 @@ before do allow(exchange).to receive(:publish).with(any_args) allow(event).to receive(:sprintf).with(key).and_return(sprinted_key) + allow(event).to receive(:sprintf).with(var_event_type).and_return(event_type) instance.send(:publish, event, encoded_event) end @@ -90,7 +99,7 @@ end it "should send the correct metadata" do - expected_metadata = {:routing_key => sprinted_key, :properties => {:persistent => persistent }} + expected_metadata = {:routing_key => sprinted_key, :properties => {:persistent => persistent, :type => event_type }} expect(exchange).to have_received(:publish).with(anything, expected_metadata) end @@ -162,7 +171,7 @@ def spawn_and_wait(instance) # Extra time to make sure the output can attach sleep 1 end - let(:message) { LogStash::Event.new("message" => "Foo Message", "extra_field" => "Blah") } + let(:message) { LogStash::Event.new("message" => "Foo Message", "extra_field" => "Blah", "@metadata" => event_metadata) } let(:encoded) { message.to_json } let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) } let(:test_channel) { test_connection.create_channel } @@ -206,7 +215,7 @@ def spawn_and_wait(instance) end describe "sending a message with an exchange specified" do - let(:message) { LogStash::Event.new("message" => "Foo Message", "extra_field" => "Blah") } + let(:message) { LogStash::Event.new("message" => "Foo Message", "extra_field" => "Blah", "@metadata" => event_metadata) } before do @received = nil From 2c66bb50578385cb85c3c732891c27ad12dcf232 Mon Sep 17 00:00:00 2001 From: Pierre Marot Date: Fri, 30 Nov 2018 07:50:36 +0100 Subject: [PATCH 2/3] define config variables in second test suite --- spec/outputs/rabbitmq_spec.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/spec/outputs/rabbitmq_spec.rb b/spec/outputs/rabbitmq_spec.rb index 2527bf2..9642fbd 100644 --- a/spec/outputs/rabbitmq_spec.rb +++ b/spec/outputs/rabbitmq_spec.rb @@ -145,6 +145,11 @@ let(:exchange) { "myexchange" } let(:exchange_type) { "topic" } let(:priority) { 34 } + let(:event_type) { "MessageType" } + let(:var_event_type) { "%{[@metadata][event_type]}" } + let(:event_metadata) { + {"event_type" => event_type } + } let(:default_plugin_config) { { "host" => "127.0.0.1", @@ -153,6 +158,7 @@ "key" => "foo", "message_properties" => { "priority" => priority + "type" => var_event_type } } } @@ -211,6 +217,7 @@ def spawn_and_wait(instance) message, payload = test_queue.pop expect(message.properties.to_s).to include("priority=#{priority}") + expect(message.properties.to_s).to include("type=#{event_type}") end end From 00d4e7a6c2081d261ede6b68e33beee2bac69998 Mon Sep 17 00:00:00 2001 From: Pierre Marot Date: Fri, 30 Nov 2018 08:13:33 +0100 Subject: [PATCH 3/3] add missing comma --- spec/outputs/rabbitmq_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/outputs/rabbitmq_spec.rb b/spec/outputs/rabbitmq_spec.rb index 9642fbd..4bdb8d5 100644 --- a/spec/outputs/rabbitmq_spec.rb +++ b/spec/outputs/rabbitmq_spec.rb @@ -157,7 +157,7 @@ "exchange_type" => exchange_type, "key" => "foo", "message_properties" => { - "priority" => priority + "priority" => priority, "type" => var_event_type } }