Skip to content

Commit

Permalink
Added ability to add message_properties with variables.
Browse files Browse the repository at this point in the history
When adding message_properties dictionary (message_properties => {"content_type" => "const type"})
You can now use dynamic values from variables {"content_type" => "%{ContentType}"}
  • Loading branch information
Sophie135 committed Jan 29, 2018
1 parent 074f7e7 commit 053f4f4
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions lib/logstash/outputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def register
@thread_local_exchange = java.lang.ThreadLocal.new
end

def symbolize(myhash)
Hash[myhash.map{|(k,v)| [k.to_sym,v]}]
def symbolize(myhash, event)
Hash[myhash.map{|(k,v)| [k.to_sym, if v.is_a? String event.sprintf(v) else v end]}]
end

def multi_receive_encoded(events_and_data)
Expand All @@ -63,7 +63,7 @@ def multi_receive_encoded(events_and_data)

def publish(event, message)
raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange
local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent)))
local_exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent), event))
rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e
@logger.error("Error while publishing. Will retry.",
:message => e.message,
Expand Down Expand Up @@ -97,4 +97,4 @@ def close
end
end
end
end
end

0 comments on commit 053f4f4

Please sign in to comment.