Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCOL-2441-fix: Fix for allowing custom publishing batch size per producer #218

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def self.configure_producer_or_consumer(kafka_config)
Deimos.config.consumers.bulk_import_id_generator,
save_associations_first: kafka_config.save_associations_first
)
else
klass.config[:max_batch_size] = kafka_config.max_batch_size || Deimos.config.producers.max_batch_size
end
end
end
Expand Down
9 changes: 1 addition & 8 deletions lib/deimos/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ class << self
def config
@config ||= {
encode_key: true,
namespace: Deimos.config.producers.schema_namespace,
max_batch_size: Deimos.config.producers.max_batch_size
namespace: Deimos.config.producers.schema_namespace
}
end

Expand All @@ -90,12 +89,6 @@ def partition_key(_payload)
nil
end

# @param size [Integer] Override the default batch size for publishing.
# @return [void]
def max_batch_size(size)
config[:max_batch_size] = size
end

# Publish the payload to the topic.
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
Expand Down
34 changes: 0 additions & 34 deletions spec/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,6 @@ def self.partition_key(payload)
key_config none: true
end
stub_const('MyNoTopicProducer', producer_class)

producer_class = Class.new(Deimos::Producer) do
schema 'MySchema'
namespace 'com.my-namespace'
topic 'my-topic'
key_config field: 'test_id'
max_batch_size 1
end
stub_const('MySmallBatchProducer', producer_class)
end

it 'should fail on invalid message with error handler' do
Expand Down Expand Up @@ -615,30 +606,5 @@ def self.partition_key(payload)
end
end

describe "max_batch_size" do
it 'should use top-level default value if max_batch_size is not defined by the producer' do
expect(MyProducer.config[:max_batch_size]).to eq(500)
end

it 'should call produce_batch multiple times when max_batch_size < records size' do
Deimos::Message.new({ 'test_id' => 'foo', 'some_int' => 123 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'foo',
key: 'foo')
Deimos::Message.new({ 'test_id' => 'bar', 'some_int' => 124 },
MySmallBatchProducer,
topic: 'my-topic',
partition_key: 'bar',
key: 'bar')
expect(described_class).to receive(:produce_batch).twice

MySmallBatchProducer.publish_list(
[{ 'test_id' => 'foo', 'some_int' => 123 },
{ 'test_id' => 'bar', 'some_int' => 124 }]
)
end
end

end
end
Loading