Skip to content

Commit

Permalink
feat: added topic exists check (#59) (#131)
Browse files Browse the repository at this point in the history
* feat: added topic exists check (#59)

* chore:

* chore: dummy

* feat: implement topic verify process

* chore: lint fix

* feat: bump kafka-cronsumer verify topic commit

* feat: add verifyTopicOnStartup flag

---------

Co-authored-by: keremcankabadayi <[email protected]>
  • Loading branch information
A.Samet İleri and keremcankabadayi committed Jun 3, 2024
1 parent b4d988b commit ca225fb
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 31 deletions.
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Kafka Konsumer
[![🔨Build And Test](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml)
# Kafka Konsumer

[![🔨Build And Test](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml)
[![🔨IntegrationTest](https://github.com/Trendyol/kafka-konsumer/actions/workflows/integration-test.yml/badge.svg?branch=main)](https://github.com/Trendyol/kafka-konsumer/actions/workflows/integration-test.yml)
[![Go Report Card](https://goreportcard.com/badge/github.com/Trendyol/kafka-konsumer/v2)](https://goreportcard.com/report/github.com/Trendyol/kafka-konsumer/v2)

Expand All @@ -13,19 +14,19 @@ manager ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer)).
## Migration Guide

### V2 Release Notes

- Added ability for manipulating kafka message headers.
- Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
- Enable manuel commit at both single and batch consuming modes.
- Enabling consumer resume/pause functionality. Please refer to [its example](examples/with-pause-resume-consumer) and
[how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation.
[how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation.
- Bumped [kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer/releases) to the latest version:
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
- Added x-error-message to see what was the error of the message during processing
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
- Added x-error-message to see what was the error of the message during processing
- Reduce memory allocation.
- Increase TP on changing internal concurrency structure.


### How to migrate from v1 to v2?

You can get latest version via `go get github.com/Trendyol/kafka-konsumer/v2@latest`
Expand All @@ -35,7 +36,7 @@ You can get latest version via `go get github.com/Trendyol/kafka-konsumer/v2@lat
- You need to change your consume function with pointer signature.

- We moved messageGroupDuration from `batchConfiguration.messageGroupDuration` to root level. Because this field is used
single (non-batch) consumer too.
single (non-batch) consumer too.

### Installation

Expand Down Expand Up @@ -229,6 +230,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `verifyTopicOnStartup` | it checks existence of the given topic(s) on the kafka cluster. | false |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | |
| `messageGroupDuration` | Maximum time to wait for a batch | 1s |
Expand All @@ -255,6 +257,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil |
| `retryConfiguration.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.messageGroupByteSizeLimit` | Maximum number of bytes in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
Expand All @@ -277,7 +280,7 @@ Kafka Konsumer offers an API that handles exposing several metrics.

### Exposed Metrics

| Metric Name | Description | Value Type |
|---------------------------------------------------------|---------------------------------------------|------------|
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
| Metric Name | Description | Value Type |
|---------------------------------------------------|---------------------------------------|------------|
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
16 changes: 16 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -96,6 +97,21 @@ func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
log := NewZapLogger(cfg.LogLevel)

if cfg.VerifyTopicOnStartup {
kclient, err := newKafkaClient(cfg)
if err != nil {
return nil, err
}
exist, err := verifyTopics(kclient, cfg)
if err != nil {
return nil, err
}
if !exist {
return nil, fmt.Errorf("topics %s does not exist, please check cluster authority etc", cfg.getTopics())
}
log.Infof("Topic [%s] verified successfully!", cfg.getTopics())
}

reader, err := cfg.newKafkaReader()
if err != nil {
log.Errorf("Error when initializing kafka reader %v", err)
Expand Down
45 changes: 28 additions & 17 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ConsumerConfig struct {
RetryConfiguration RetryConfiguration
LogLevel LogLevel
Rack string
VerifyTopicOnStartup bool
ClientID string
Reader ReaderConfig
CommitInterval time.Duration
Expand All @@ -66,23 +67,24 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
ClientID: cfg.RetryConfiguration.ClientID,
Brokers: cfg.RetryConfiguration.Brokers,
Consumer: kcronsumer.ConsumerConfig{
ClientID: cfg.ClientID,
GroupID: cfg.Reader.GroupID,
Topic: cfg.RetryConfiguration.Topic,
DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic,
Cron: cfg.RetryConfiguration.StartTimeCron,
Duration: cfg.RetryConfiguration.WorkDuration,
Concurrency: cfg.Concurrency,
MinBytes: cfg.Reader.MinBytes,
MaxBytes: cfg.Reader.MaxBytes,
MaxRetry: cfg.RetryConfiguration.MaxRetry,
MaxWait: cfg.Reader.MaxWait,
CommitInterval: cfg.Reader.CommitInterval,
HeartbeatInterval: cfg.Reader.HeartbeatInterval,
SessionTimeout: cfg.Reader.SessionTimeout,
RebalanceTimeout: cfg.Reader.RebalanceTimeout,
StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset),
RetentionTime: cfg.Reader.RetentionTime,
ClientID: cfg.ClientID,
GroupID: cfg.Reader.GroupID,
Topic: cfg.RetryConfiguration.Topic,
DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic,
Cron: cfg.RetryConfiguration.StartTimeCron,
Duration: cfg.RetryConfiguration.WorkDuration,
MaxRetry: cfg.RetryConfiguration.MaxRetry,
VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup,
Concurrency: cfg.Concurrency,
MinBytes: cfg.Reader.MinBytes,
MaxBytes: cfg.Reader.MaxBytes,
MaxWait: cfg.Reader.MaxWait,
CommitInterval: cfg.Reader.CommitInterval,
HeartbeatInterval: cfg.Reader.HeartbeatInterval,
SessionTimeout: cfg.Reader.SessionTimeout,
RebalanceTimeout: cfg.Reader.RebalanceTimeout,
StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset),
RetentionTime: cfg.Reader.RetentionTime,
},
Producer: kcronsumer.ProducerConfig{
Balancer: cfg.RetryConfiguration.Balancer,
Expand Down Expand Up @@ -113,6 +115,14 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
return &cronsumerCfg
}

func (cfg *ConsumerConfig) getTopics() []string {
if len(cfg.Reader.GroupTopics) > 0 {
return cfg.Reader.GroupTopics
}

return []string{cfg.Reader.Topic}
}

type APIConfiguration struct {
// Port default is 8090
Port *int
Expand Down Expand Up @@ -157,6 +167,7 @@ type RetryConfiguration struct {
Topic string
DeadLetterTopic string
Rack string
VerifyTopicOnStartup bool
LogLevel LogLevel
Brokers []string
Balancer Balancer
Expand Down
35 changes: 35 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,38 @@ func Test_toHeader(t *testing.T) {
}
})
}

func TestConsumerConfig_getTopics(t *testing.T) {
t.Run("Should_Get_Consumer_Group_Topics", func(t *testing.T) {
// Given
cfg := ConsumerConfig{
Reader: ReaderConfig{
GroupTopics: []string{"t1", "t2", "t3"},
},
}

// When
result := cfg.getTopics()

// Then
if len(result) != 3 {
t.Error("len of result must be equal 3")
}
})
t.Run("Should_Get_Topic", func(t *testing.T) {
// Given
cfg := ConsumerConfig{
Reader: ReaderConfig{
Topic: "t1",
},
}

// When
result := cfg.getTopics()

// Then
if len(result) != 1 {
t.Error("len of result must be equal 1")
}
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2
go 1.19

require (
github.com/Trendyol/kafka-cronsumer v1.5.2
github.com/Trendyol/kafka-cronsumer v1.5.3
github.com/Trendyol/otel-kafka-konsumer v0.0.7
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.52.1
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNu
github.com/Trendyol/kafka-cronsumer v1.5.2-0.20240529192345-3622137cb12a/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.2 h1:wllvb2BhAWFnjN7wPVXVZ14lxPWAnLVlxKxoxIekQRI=
github.com/Trendyol/kafka-cronsumer v1.5.2/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240602191646-ecba325784a5 h1:KVnsVsH/JKSzFjL7kDHMRjQzLzE/y40CB6PphVlsrwQ=
github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240602191646-ecba325784a5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240603202122-cbc6017e8d40 h1:ogMKdiQIPmOqdDwAeoPHbXOsjFIcKXVbv9C4AVwZ8Hc=
github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240603202122-cbc6017e8d40/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.3 h1:I3x7KUceHlae69MyBYx6Vj1ctMexeIKEUq2xNg0wvG8=
github.com/Trendyol/kafka-cronsumer v1.5.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
)

require (
github.com/Trendyol/kafka-cronsumer v1.5.1 // indirect
github.com/Trendyol/kafka-cronsumer v1.5.3 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4Qf
github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ=
github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.2/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240602191646-ecba325784a5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240603202122-cbc6017e8d40/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
70 changes: 70 additions & 0 deletions verify_topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kafka

import (
"context"
"fmt"

"github.com/segmentio/kafka-go"
)

type kafkaClient interface {
Metadata(ctx context.Context, req *kafka.MetadataRequest) (*kafka.MetadataResponse, error)
GetClient() *kafka.Client
}

type client struct {
*kafka.Client
}

func newKafkaClient(cfg *ConsumerConfig) (kafkaClient, error) {
kc := client{
Client: &kafka.Client{
Addr: kafka.TCP(cfg.Reader.Brokers...),
},
}

transport := &Transport{
Transport: &kafka.Transport{
MetadataTopics: cfg.getTopics(),
},
}
if err := fillLayer(transport, cfg.SASL, cfg.TLS); err != nil {
return nil, fmt.Errorf("error when initializing kafka client for verify topic purpose %w", err)
}

kc.Transport = transport
return &kc, nil
}

func (k *client) GetClient() *kafka.Client {
return k.Client
}

func verifyTopics(client kafkaClient, cfg *ConsumerConfig) (bool, error) {
topics := cfg.getTopics()

metadata, err := client.Metadata(context.Background(), &kafka.MetadataRequest{
Topics: topics,
})
if err != nil {
return false, fmt.Errorf("error when during verifyTopics metadata request %w", err)
}
return checkTopicsWithinMetadata(metadata, topics)
}

func checkTopicsWithinMetadata(metadata *kafka.MetadataResponse, topics []string) (bool, error) {
metadataTopics := make(map[string]struct{}, len(metadata.Topics))
for _, topic := range metadata.Topics {
if topic.Error != nil {
continue
}
metadataTopics[topic.Name] = struct{}{}
}

for _, topic := range topics {
if _, exist := metadataTopics[topic]; !exist {
return false, nil
}
}
return true, nil
}
Loading

0 comments on commit ca225fb

Please sign in to comment.