Skip to content

Commit

Permalink
output: add support for sprintf templating in message_properties
Browse files Browse the repository at this point in the history
  • Loading branch information
yaauie committed Nov 21, 2019
1 parent 8cc3b95 commit 1030662
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Unreleased
- Add support in Output plugin for `sprintf` templates in values provided to `message_properties` ([#8](https://github.com/logstash-plugins/logstash-integration-rabbitmq/issues/8))

## 7.0.1
- Improves Input Plugin documentation to better align with upstream guidance [#4](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/4)

Expand Down
3 changes: 2 additions & 1 deletion docs/output-rabbitmq.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ Key to route to by default. Defaults to 'logstash'
* Value type is <<hash,hash>>
* Default value is `{}`

Add properties to be set per-message here, such as 'content_type', 'priority'
Add properties to be set per-message here, such as 'content_type', 'priority'.
Values can be {logstash-ref}/event-dependent-configuration.html#sprintf[`sprintf` templates], whose value for each message will be populated from the event.

Example:
[source,ruby]
Expand Down
64 changes: 63 additions & 1 deletion lib/logstash/outputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class RabbitMQ < LogStash::Outputs::Base
config :message_properties, :validate => :hash, :default => {}

def register
@message_properties_template = MessagePropertiesTemplate.new(symbolize(@message_properties).merge(:persistent => @persistent))

connect!
@hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable)
# The connection close should close all channels, so it is safe to store thread locals here without closing them
Expand All @@ -67,8 +69,10 @@ def multi_receive_encoded(events_and_data)

def publish(event, message)
raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange
routing_key = event.sprintf(@key)
message_properties = @message_properties_template.build(event)
@gated_executor.execute do
local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent)))
local_exchange.publish(message, :routing_key => routing_key, :properties => message_properties)
end
rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e
@logger.error("Error while publishing. Will retry.",
Expand Down Expand Up @@ -126,6 +130,64 @@ def back_pressure_provider_for_connection(march_hare_connection)
end
end
end

##
# A `MessagePropertiesTemplate` efficiently produces per-event message properties from the
# provided template Hash.
#
# In order to efficiently reuse constant-value objects, returned values may be frozen.
class MessagePropertiesTemplate
##
# Creates a new `MessagePropertiesTemplate` from the provided `template`
# @param template [Hash{Symbol=>Object}]
def initialize(template)
constant_properties = template.reject { |_,v| templated?(v) }
variable_properties = template.select { |_,v| templated?(v) }

@constant_properties = normalize(constant_properties).freeze
@variable_properties = variable_properties
end

##
# Builds a property mapping for the given `event`, including templated values.
#
# @param event [LogStash::Event]: the event with which to populated templated values, if any.
# @return [Hash{Symbol=>Object}] a possibly-frozen properties hash for the provided `event`.
def build(event)
return @constant_properties if @variable_properties.empty?

properties = @variable_properties.each_with_object(@constant_properties.dup) do |(k,v), memo|
memo.store(k, event.sprintf(v))
end

return normalize(properties)
end

private

##
# Normalize the provided property mapping with respect to the value types the underlying
# client expects.
#
# @api private
# @param properties [Hash{Symbol=>Object}]: a possibly-frozen Hash whose values may need type-coercion.
# @return [Hash{Symbol=>Object}]
def normalize(properties)
if properties[:priority] && properties[:priority].kind_of?(String)
properties = properties.merge(:priority => properties[:priority].to_i)
end

properties
end

##
# @api private
# @param [Object]: an object, which may or may not be a template `String`
# @return [Boolean]: returns `true` IFF `value` is a template `String`
def templated?(value)
value.kind_of?(String) && value.include?('%{')
end
end
end
end
end
51 changes: 50 additions & 1 deletion spec/outputs/rabbitmq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,67 @@
before do
allow(exchange).to receive(:publish).with(any_args)
allow(event).to receive(:sprintf).with(key).and_return(sprinted_key)
instance.send(:publish, event, encoded_event)
end

it "should send the correct message" do
instance.send(:publish, event, encoded_event)
expect(exchange).to have_received(:publish).with(encoded_event, anything)
end

it "should send the correct metadata" do
expected_metadata = {:routing_key => sprinted_key, :properties => {:persistent => persistent }}

instance.send(:publish, event, encoded_event)

expect(exchange).to have_received(:publish).with(anything, expected_metadata)
end

context 'with message_properties' do
let(:rabbitmq_settings) { super().merge("message_properties" => message_properties) }
let(:message_properties) { Hash.new }
context 'priority' do
let(:message_properties) { super().merge("priority" => priority) }
context 'as literal Integer value' do
let(:priority) { 3 }
it 'publishes with the constant-value priority' do
instance.send(:publish, event, encoded_event)
expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:priority => 3)))
end
end

context 'as literal String value' do
let(:priority) { "7" }
it 'publishes with the constant-value priority' do
instance.send(:publish, event, encoded_event)
expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:priority => 7)))
end
end

context 'as template value' do
let(:priority) { "%{[@metadata][priority]}" }
context 'when event expands template value' do
before do
expect(event).to receive(:sprintf).with(priority).and_return("31")
end

it 'publishes with the priority extracted from the event' do
instance.send(:publish, event, encoded_event)
expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:priority => 31)))
end
end
context 'when event cannot expand template value' do
before do
expect(event).to receive(:sprintf).with(priority).and_return(priority)
end

it 'publishes with the priority of zero (`0`)' do
instance.send(:publish, event, encoded_event)
expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:priority => 0)))
end
end
end
end
end
end

context 'when an exception is encountered' do
Expand Down

0 comments on commit 1030662

Please sign in to comment.