diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index ac73e6b..819b402 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -100,6 +100,8 @@ def self.configure_producer_or_consumer(kafka_config) Deimos.config.consumers.bulk_import_id_generator, save_associations_first: kafka_config.save_associations_first ) + else + klass.config[:max_batch_size] = kafka_config.max_batch_size || Deimos.config.producers.max_batch_size end end end diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index 646fb33..ebdca27 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -65,8 +65,7 @@ class << self def config @config ||= { encode_key: true, - namespace: Deimos.config.producers.schema_namespace, - max_batch_size: Deimos.config.producers.max_batch_size + namespace: Deimos.config.producers.schema_namespace } end @@ -90,12 +89,6 @@ def partition_key(_payload) nil end - # @param size [Integer] Override the default batch size for publishing. - # @return [void] - def max_batch_size(size) - config[:max_batch_size] = size - end - # Publish the payload to the topic. # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index a841d3b..b87d3a1 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -71,15 +71,6 @@ def self.partition_key(payload) key_config none: true end stub_const('MyNoTopicProducer', producer_class) - - producer_class = Class.new(Deimos::Producer) do - schema 'MySchema' - namespace 'com.my-namespace' - topic 'my-topic' - key_config field: 'test_id' - max_batch_size 1 - end - stub_const('MySmallBatchProducer', producer_class) end it 'should fail on invalid message with error handler' do @@ -615,30 +606,5 @@ def self.partition_key(payload) end end - describe "max_batch_size" do - it 'should use top-level default value if max_batch_size is not defined by the producer' do - expect(MyProducer.config[:max_batch_size]).to eq(500) - end - - it 'should call produce_batch multiple times when max_batch_size < records size' do - Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 }, - MySmallBatchProducer, - topic: 'my-topic', - partition_key: 'foo', - key: 'foo') - Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 }, - MySmallBatchProducer, - topic: 'my-topic', - partition_key: 'bar', - key: 'bar') - expect(described_class).to receive(:produce_batch).twice - - MySmallBatchProducer.publish_list( - [{ 'test_id' => 'foo', 'some_int' => 123 }, - { 'test_id' => 'bar', 'some_int' => 124 }] - ) - end - end - end end