diff --git a/CHANGELOG.md b/CHANGELOG.md index f69b5cda..695dbe85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix: Don't update last_sent to current time on every poll. - Feature: Allow for infinite retries in DB poller. +- Feature: Add support for message headers. # 1.22.2 - 2023-05-10 - Feature: Add `DEIMOS_TASK_NAME` env variable when running a task (consumer, DB poller, DB producer). diff --git a/README.md b/README.md index 7c2326de..be9628ad 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,7 @@ class MyProducer < Deimos::Producer } # You can also publish an array with self.publish_list(payloads) # You may specify the topic here with self.publish(payload, topic: 'my-topic') + # You may also specify the headers here with self.publish(payload, headers: { 'foo' => 'bar' }) self.publish(payload) end @@ -1171,13 +1172,14 @@ end # A matcher which allows you to test that a message was sent on the given # topic, without having to know which class produced it. -expect(topic_name).to have_sent(payload, key=nil) +expect(topic_name).to have_sent(payload, key=nil, partition_key=nil, headers=nil) # Inspect sent messages message = Deimos::Backends::Test.sent_messages[0] expect(message).to eq({ message: {'some-key' => 'some-value'}, topic: 'my-topic', + headers: { 'foo' => 'bar' }, key: 'my-id' }) ``` diff --git a/lib/deimos.rb b/lib/deimos.rb index 92da2837..1ac0e24b 100644 --- a/lib/deimos.rb +++ b/lib/deimos.rb @@ -23,7 +23,6 @@ require 'deimos/schema_class/enum' require 'deimos/schema_class/record' -require 'deimos/monkey_patches/phobos_producer' require 'deimos/monkey_patches/phobos_cli' require 'deimos/railtie' if defined?(Rails) diff --git a/lib/deimos/backends/base.rb b/lib/deimos/backends/base.rb index d6ca2ab7..00f9da6b 100644 --- a/lib/deimos/backends/base.rb +++ b/lib/deimos/backends/base.rb @@ -37,6 +37,10 @@ def log_message(messages) log_message.merge!( payloads_count: messages.count ) + when :headers + log_message.merge!( + payload_headers: messages.map(&:headers) + ) else log_message.merge!( payloads: messages.map do |message| diff --git a/lib/deimos/message.rb b/lib/deimos/message.rb index e4f354e7..2e5cf1f8 100644 --- a/lib/deimos/message.rb +++ b/lib/deimos/message.rb @@ -7,6 +7,8 @@ class Message attr_accessor :payload # @return [Hash, String, Integer] attr_accessor :key + # @return [Hash] + attr_accessor :headers # @return [Integer] attr_accessor :partition_key # @return [String] @@ -23,11 +25,12 @@ class Message # @param topic [String] # @param key [String, Integer, Hash] # @param partition_key [Integer] - def initialize(payload, producer, topic: nil, key: nil, partition_key: nil) + def initialize(payload, producer, topic: nil, key: nil, headers: nil, partition_key: nil) @payload = payload&.with_indifferent_access @producer_name = producer&.name @topic = topic @key = key + @headers = headers&.with_indifferent_access @partition_key = partition_key end @@ -59,13 +62,14 @@ def encoded_hash { topic: @topic, key: @encoded_key, + headers: @headers, partition_key: @partition_key || @encoded_key, payload: @encoded_payload, metadata: { decoded_payload: @payload, producer_name: @producer_name } - } + }.delete_if { |k, v| k == :headers && v.nil? } end # @return [Hash] @@ -73,13 +77,14 @@ def to_h { topic: @topic, key: @key, + headers: @headers, partition_key: @partition_key || @key, payload: @payload, metadata: { decoded_payload: @payload, producer_name: @producer_name } - } + }.delete_if { |k, v| k == :headers && v.nil? } end # @param other [Message] diff --git a/lib/deimos/monkey_patches/phobos_producer.rb b/lib/deimos/monkey_patches/phobos_producer.rb deleted file mode 100644 index c8402f95..00000000 --- a/lib/deimos/monkey_patches/phobos_producer.rb +++ /dev/null @@ -1,52 +0,0 @@ -# frozen_string_literal: true - -require 'phobos/producer' - -#@!visibility private -module Phobos - module Producer - # :nodoc: - class PublicAPI - # :nodoc: - def publish(topic, payload, key=nil, partition_key=nil) - class_producer.publish(topic, payload, key, partition_key) - end - - # :nodoc: - def async_publish(topic, payload, key=nil, partition_key=nil) - class_producer.async_publish(topic, payload, key, partition_key) - end - end - - # :nodoc: - module ClassMethods - # :nodoc: - class PublicAPI - # :nodoc: - def publish(topic, payload, key=nil, partition_key=nil) - publish_list([{ topic: topic, payload: payload, key: key, - partition_key: partition_key }]) - end - - # :nodoc: - def async_publish(topic, payload, key=nil, partition_key=nil) - async_publish_list([{ topic: topic, payload: payload, key: key, - partition_key: partition_key }]) - end - - private - - # :nodoc: - def produce_messages(producer, messages) - messages.each do |message| - partition_key = message[:partition_key] || message[:key] - producer.produce(message[:payload], - topic: message[:topic], - key: message[:key], - partition_key: partition_key) - end - end - end - end - end -end diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index 9b594f1f..6b021525 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -95,9 +95,10 @@ def partition_key(_payload) # Publish the payload to the topic. # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic + # @param headers [Hash] if specifying headers # @return [void] - def publish(payload, topic: self.topic) - publish_list([payload], topic: topic) + def publish(payload, topic: self.topic, headers: nil) + publish_list([payload], topic: topic, headers: headers) end # Publish a list of messages. @@ -107,8 +108,9 @@ def publish(payload, topic: self.topic) # @param force_send [Boolean] if true, ignore the configured backend # and send immediately to Kafka. # @param topic [String] if specifying the topic + # @param headers [Hash] if specifying headers # @return [void] - def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) + def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) @@ -122,7 +124,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) topic: topic, payloads: payloads ) do - messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self) } + messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) } messages.each { |m| _process_message(m, topic) } messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) diff --git a/lib/deimos/test_helpers.rb b/lib/deimos/test_helpers.rb index 983fd83f..c1afcd7e 100644 --- a/lib/deimos/test_helpers.rb +++ b/lib/deimos/test_helpers.rb @@ -133,7 +133,7 @@ def _frk_failure_message(topic, message, key=nil, partition_key=nil, was_negated str + "\nAll Messages received:\n#{message_string}" end - RSpec::Matchers.define :have_sent do |msg, key=nil, partition_key=nil| + RSpec::Matchers.define :have_sent do |msg, key=nil, partition_key=nil, headers=nil| message = if msg.respond_to?(:with_indifferent_access) msg.with_indifferent_access else @@ -147,7 +147,14 @@ def _frk_failure_message(topic, message, key=nil, partition_key=nil, was_negated m[:payload]&.with_indifferent_access) && topic == m[:topic] && (key.present? ? key == m[:key] : true) && - (partition_key.present? ? partition_key == m[:partition_key] : true) + (partition_key.present? ? partition_key == m[:partition_key] : true) && + if headers.present? + hash_matcher.send(:match, + headers&.with_indifferent_access, + m[:headers]&.with_indifferent_access) + else + true + end end end diff --git a/spec/message_spec.rb b/spec/message_spec.rb index ebb44d16..0ef89cc7 100644 --- a/spec/message_spec.rb +++ b/spec/message_spec.rb @@ -16,4 +16,24 @@ expect { described_class.new({ a: 1, b: 2 }, nil, key: { c: 3, d: 4 }) }. not_to raise_exception end + + describe 'headers' do + it 'returns nil when not set' do + expect(described_class.new({ v: 'val1' }, nil, key: 'key1')). + to have_attributes(headers: nil) + end + + it 'can set and get headers' do + expect(described_class.new({ v: 'val1' }, nil, key: 'key1', headers: { a: 1 })). + to have_attributes(headers: { a: 1 }) + end + + it 'includes headers when converting to Hash' do + expect(described_class.new({ v: 'val1' }, nil, key: 'key1', headers: { a: 1 }).to_h). + to include(headers: { a: 1 }) + + expect(described_class.new({ v: 'val1' }, nil, key: 'key1', headers: { a: 1 }).encoded_hash). + to include(headers: { a: 1 }) + end + end end diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index 82c613f9..c55fdc23 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -110,18 +110,20 @@ def self.partition_key(payload) expect('my-topic').not_to have_sent('test_id' => 'foo2', 'some_int' => 123) end - it 'should allow setting the topic from publish_list' do + it 'should allow setting the topic and headers from publish_list' do expect(described_class).to receive(:produce_batch).once.with( Deimos::Backends::Test, [ Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, MyProducer, topic: 'a-new-topic', + headers: { 'foo' => 'bar' }, partition_key: 'foo', key: 'foo'), Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, MyProducer, topic: 'a-new-topic', + headers: { 'foo' => 'bar' }, partition_key: 'bar', key: 'bar') ] @@ -130,9 +132,10 @@ def self.partition_key(payload) MyProducer.publish_list( [{ 'test_id' => 'foo', 'some_int' => 123 }, { 'test_id' => 'bar', 'some_int' => 124 }], - topic: 'a-new-topic' + topic: 'a-new-topic', + headers: { 'foo' => 'bar' } ) - expect('a-new-topic').to have_sent('test_id' => 'foo', 'some_int' => 123) + expect('a-new-topic').to have_sent({ 'test_id' => 'foo', 'some_int' => 123 }, nil, nil, { 'foo' => 'bar' }) expect('my-topic').not_to have_sent('test_id' => 'foo', 'some_int' => 123) expect('my-topic').not_to have_sent('test_id' => 'foo2', 'some_int' => 123) end