Skip to content

Commit

Permalink
feat: provide message group limit default (#134)
Browse files Browse the repository at this point in the history
* feature: provide message group limit default

* chore: example limit duration specified stay
  • Loading branch information
A.Samet İleri committed Jul 17, 2024
1 parent a5a8628 commit ff0c6b1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil |
| `retryConfiguration.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | 100 |
| `batchConfiguration.messageGroupByteSizeLimit` | Maximum number of bytes in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
Expand Down
4 changes: 4 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (cfg *ConsumerConfig) setDefaults() {
cfg.MessageGroupDuration = time.Second
}

if cfg.BatchConfiguration != nil && cfg.BatchConfiguration.MessageGroupLimit == 0 {
cfg.BatchConfiguration.MessageGroupLimit = 100
}

if cfg.DistributedTracingEnabled {
if cfg.DistributedTracingConfiguration.Propagator == nil {
cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator()
Expand Down
16 changes: 16 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ func TestConsumerConfig_validate(t *testing.T) {
if cfg.MessageGroupDuration != time.Second {
t.Fatal("Message Group Duration default value must equal to 1s")
}
if cfg.BatchConfiguration != nil {
t.Fatalf("Batch configuration not specified so it must stay as nil")
}
})
t.Run("Set_Defaults_For_BatchConfiguration", func(t *testing.T) {
// Given
cfg := ConsumerConfig{BatchConfiguration: &BatchConfiguration{}}

// When
cfg.setDefaults()

// Then

if cfg.BatchConfiguration.MessageGroupLimit != 100 {
t.Fatalf("MessageGroupLimit Batch configuration not specified so it must take default value")
}
})
t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) {
// Given
Expand Down

0 comments on commit ff0c6b1

Please sign in to comment.