diff --git a/README.md b/README.md index a31f2cb..1c71c86 100644 --- a/README.md +++ b/README.md @@ -258,7 +258,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | | `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | | `retryConfiguration.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false | -| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | +| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | 100 | | `batchConfiguration.messageGroupByteSizeLimit` | Maximum number of bytes in a batch | | | `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | | `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | | diff --git a/consumer_config.go b/consumer_config.go index a6daec0..e3cf8d1 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -246,6 +246,10 @@ func (cfg *ConsumerConfig) setDefaults() { cfg.MessageGroupDuration = time.Second } + if cfg.BatchConfiguration != nil && cfg.BatchConfiguration.MessageGroupLimit == 0 { + cfg.BatchConfiguration.MessageGroupLimit = 100 + } + if cfg.DistributedTracingEnabled { if cfg.DistributedTracingConfiguration.Propagator == nil { cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator() diff --git a/consumer_config_test.go b/consumer_config_test.go index 6b9203c..97e03ad 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -32,6 +32,22 @@ func TestConsumerConfig_validate(t *testing.T) { if cfg.MessageGroupDuration != time.Second { t.Fatal("Message Group Duration default value must equal to 1s") } + if cfg.BatchConfiguration != nil { + t.Fatalf("Batch configuration not specified so it must stay as nil") + } + }) + t.Run("Set_Defaults_For_BatchConfiguration", func(t *testing.T) { + // Given + cfg := ConsumerConfig{BatchConfiguration: &BatchConfiguration{}} + + // When + cfg.setDefaults() + + // Then + + if cfg.BatchConfiguration.MessageGroupLimit != 100 { + t.Fatalf("MessageGroupLimit Batch configuration not specified so it must take default value") + } }) t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) { // Given