From ca225fb37872ac0a84143bce75113f483ecd62dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?A=2ESamet=20=C4=B0leri?= Date: Mon, 3 Jun 2024 23:47:58 +0300 Subject: [PATCH] feat: added topic exists check (#59) (#131) * feat: added topic exists check (#59) * chore: * chore: dummy * feat: implement topic verify process * chore: lint fix * feat: bump kafka-cronsumer verify topic commit * feat: add verifyTopicOnStartup flag --------- Co-authored-by: keremcankabadayi --- README.md | 27 +++++---- consumer_base.go | 16 +++++ consumer_config.go | 45 ++++++++------ consumer_config_test.go | 35 +++++++++++ go.mod | 2 +- go.sum | 6 ++ test/integration/go.mod | 2 +- test/integration/go.sum | 4 ++ verify_topic.go | 70 ++++++++++++++++++++++ verify_topic_test.go | 130 ++++++++++++++++++++++++++++++++++++++++ 10 files changed, 306 insertions(+), 31 deletions(-) create mode 100644 verify_topic.go create mode 100644 verify_topic_test.go diff --git a/README.md b/README.md index fda7072..14fb2a5 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ -# Kafka Konsumer -[![🔨Build And Test](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml) +# Kafka Konsumer + +[![🔨Build And Test](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml/badge.svg?branch=main)](https://github.com/Trendyol/kafka-konsumer/actions/workflows/test.yml) [![🔨IntegrationTest](https://github.com/Trendyol/kafka-konsumer/actions/workflows/integration-test.yml/badge.svg?branch=main)](https://github.com/Trendyol/kafka-konsumer/actions/workflows/integration-test.yml) [![Go Report Card](https://goreportcard.com/badge/github.com/Trendyol/kafka-konsumer/v2)](https://goreportcard.com/report/github.com/Trendyol/kafka-konsumer/v2) @@ -13,19 +14,19 @@ manager ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer)). ## Migration Guide ### V2 Release Notes + - Added ability for manipulating kafka message headers. - Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages. - Enable manuel commit at both single and batch consuming modes. - Enabling consumer resume/pause functionality. Please refer to [its example](examples/with-pause-resume-consumer) and -[how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation. + [how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation. - Bumped [kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer/releases) to the latest version: - - Backoff strategy support (linear, exponential options) - - Added message key for retried messages - - Added x-error-message to see what was the error of the message during processing + - Backoff strategy support (linear, exponential options) + - Added message key for retried messages + - Added x-error-message to see what was the error of the message during processing - Reduce memory allocation. - Increase TP on changing internal concurrency structure. - ### How to migrate from v1 to v2? You can get latest version via `go get github.com/Trendyol/kafka-konsumer/v2@latest` @@ -35,7 +36,7 @@ You can get latest version via `go get github.com/Trendyol/kafka-konsumer/v2@lat - You need to change your consume function with pointer signature. - We moved messageGroupDuration from `batchConfiguration.messageGroupDuration` to root level. Because this field is used -single (non-batch) consumer too. + single (non-batch) consumer too. ### Installation @@ -229,6 +230,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryEnabled` | Retry/Exception consumer is working or not | false | | `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true | | `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | +| `verifyTopicOnStartup` | it checks existence of the given topic(s) on the kafka cluster. | false | | `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | | `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | | | `messageGroupDuration` | Maximum time to wait for a batch | 1s | @@ -255,6 +257,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | | `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | | `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | +| `retryConfiguration.verifyTopicOnStartup` | it checks existence of the given retry topic on the kafka cluster. | false | | `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | | `batchConfiguration.messageGroupByteSizeLimit` | Maximum number of bytes in a batch | | | `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | @@ -277,7 +280,7 @@ Kafka Konsumer offers an API that handles exposing several metrics. ### Exposed Metrics -| Metric Name | Description | Value Type | -|---------------------------------------------------------|---------------------------------------------|------------| -| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter | -| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter | +| Metric Name | Description | Value Type | +|---------------------------------------------------|---------------------------------------|------------| +| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter | +| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter | diff --git a/consumer_base.go b/consumer_base.go index 95ad55d..f96e9b2 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "fmt" "sync" "time" @@ -96,6 +97,21 @@ func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { log := NewZapLogger(cfg.LogLevel) + if cfg.VerifyTopicOnStartup { + kclient, err := newKafkaClient(cfg) + if err != nil { + return nil, err + } + exist, err := verifyTopics(kclient, cfg) + if err != nil { + return nil, err + } + if !exist { + return nil, fmt.Errorf("topics %s does not exist, please check cluster authority etc", cfg.getTopics()) + } + log.Infof("Topic [%s] verified successfully!", cfg.getTopics()) + } + reader, err := cfg.newKafkaReader() if err != nil { log.Errorf("Error when initializing kafka reader %v", err) diff --git a/consumer_config.go b/consumer_config.go index b30d9eb..a6daec0 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -43,6 +43,7 @@ type ConsumerConfig struct { RetryConfiguration RetryConfiguration LogLevel LogLevel Rack string + VerifyTopicOnStartup bool ClientID string Reader ReaderConfig CommitInterval time.Duration @@ -66,23 +67,24 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { ClientID: cfg.RetryConfiguration.ClientID, Brokers: cfg.RetryConfiguration.Brokers, Consumer: kcronsumer.ConsumerConfig{ - ClientID: cfg.ClientID, - GroupID: cfg.Reader.GroupID, - Topic: cfg.RetryConfiguration.Topic, - DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic, - Cron: cfg.RetryConfiguration.StartTimeCron, - Duration: cfg.RetryConfiguration.WorkDuration, - Concurrency: cfg.Concurrency, - MinBytes: cfg.Reader.MinBytes, - MaxBytes: cfg.Reader.MaxBytes, - MaxRetry: cfg.RetryConfiguration.MaxRetry, - MaxWait: cfg.Reader.MaxWait, - CommitInterval: cfg.Reader.CommitInterval, - HeartbeatInterval: cfg.Reader.HeartbeatInterval, - SessionTimeout: cfg.Reader.SessionTimeout, - RebalanceTimeout: cfg.Reader.RebalanceTimeout, - StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset), - RetentionTime: cfg.Reader.RetentionTime, + ClientID: cfg.ClientID, + GroupID: cfg.Reader.GroupID, + Topic: cfg.RetryConfiguration.Topic, + DeadLetterTopic: cfg.RetryConfiguration.DeadLetterTopic, + Cron: cfg.RetryConfiguration.StartTimeCron, + Duration: cfg.RetryConfiguration.WorkDuration, + MaxRetry: cfg.RetryConfiguration.MaxRetry, + VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup, + Concurrency: cfg.Concurrency, + MinBytes: cfg.Reader.MinBytes, + MaxBytes: cfg.Reader.MaxBytes, + MaxWait: cfg.Reader.MaxWait, + CommitInterval: cfg.Reader.CommitInterval, + HeartbeatInterval: cfg.Reader.HeartbeatInterval, + SessionTimeout: cfg.Reader.SessionTimeout, + RebalanceTimeout: cfg.Reader.RebalanceTimeout, + StartOffset: kcronsumer.ToStringOffset(cfg.Reader.StartOffset), + RetentionTime: cfg.Reader.RetentionTime, }, Producer: kcronsumer.ProducerConfig{ Balancer: cfg.RetryConfiguration.Balancer, @@ -113,6 +115,14 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { return &cronsumerCfg } +func (cfg *ConsumerConfig) getTopics() []string { + if len(cfg.Reader.GroupTopics) > 0 { + return cfg.Reader.GroupTopics + } + + return []string{cfg.Reader.Topic} +} + type APIConfiguration struct { // Port default is 8090 Port *int @@ -157,6 +167,7 @@ type RetryConfiguration struct { Topic string DeadLetterTopic string Rack string + VerifyTopicOnStartup bool LogLevel LogLevel Brokers []string Balancer Balancer diff --git a/consumer_config_test.go b/consumer_config_test.go index 5855784..6b9203c 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -138,3 +138,38 @@ func Test_toHeader(t *testing.T) { } }) } + +func TestConsumerConfig_getTopics(t *testing.T) { + t.Run("Should_Get_Consumer_Group_Topics", func(t *testing.T) { + // Given + cfg := ConsumerConfig{ + Reader: ReaderConfig{ + GroupTopics: []string{"t1", "t2", "t3"}, + }, + } + + // When + result := cfg.getTopics() + + // Then + if len(result) != 3 { + t.Error("len of result must be equal 3") + } + }) + t.Run("Should_Get_Topic", func(t *testing.T) { + // Given + cfg := ConsumerConfig{ + Reader: ReaderConfig{ + Topic: "t1", + }, + } + + // When + result := cfg.getTopics() + + // Then + if len(result) != 1 { + t.Error("len of result must be equal 1") + } + }) +} diff --git a/go.mod b/go.mod index 7d0dc81..9c2f536 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2 go 1.19 require ( - github.com/Trendyol/kafka-cronsumer v1.5.2 + github.com/Trendyol/kafka-cronsumer v1.5.3 github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.52.1 diff --git a/go.sum b/go.sum index 377030a..acdb4bb 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,12 @@ github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNu github.com/Trendyol/kafka-cronsumer v1.5.2-0.20240529192345-3622137cb12a/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/kafka-cronsumer v1.5.2 h1:wllvb2BhAWFnjN7wPVXVZ14lxPWAnLVlxKxoxIekQRI= github.com/Trendyol/kafka-cronsumer v1.5.2/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240602191646-ecba325784a5 h1:KVnsVsH/JKSzFjL7kDHMRjQzLzE/y40CB6PphVlsrwQ= +github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240602191646-ecba325784a5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240603202122-cbc6017e8d40 h1:ogMKdiQIPmOqdDwAeoPHbXOsjFIcKXVbv9C4AVwZ8Hc= +github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240603202122-cbc6017e8d40/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.3 h1:I3x7KUceHlae69MyBYx6Vj1ctMexeIKEUq2xNg0wvG8= +github.com/Trendyol/kafka-cronsumer v1.5.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= diff --git a/test/integration/go.mod b/test/integration/go.mod index 162f505..bf9a98d 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - github.com/Trendyol/kafka-cronsumer v1.5.1 // indirect + github.com/Trendyol/kafka-cronsumer v1.5.3 // indirect github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 2489c9f..9579d06 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -2,6 +2,10 @@ github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4Qf github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/kafka-cronsumer v1.5.1 h1:L8RLxo8zSGOfVpjtXLUqL3PsJLZdeoFcOvN1yCY/GyQ= github.com/Trendyol/kafka-cronsumer v1.5.1/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.2/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240602191646-ecba325784a5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.3-0.20240603202122-cbc6017e8d40/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= diff --git a/verify_topic.go b/verify_topic.go new file mode 100644 index 0000000..19f20a7 --- /dev/null +++ b/verify_topic.go @@ -0,0 +1,70 @@ +package kafka + +import ( + "context" + "fmt" + + "github.com/segmentio/kafka-go" +) + +type kafkaClient interface { + Metadata(ctx context.Context, req *kafka.MetadataRequest) (*kafka.MetadataResponse, error) + GetClient() *kafka.Client +} + +type client struct { + *kafka.Client +} + +func newKafkaClient(cfg *ConsumerConfig) (kafkaClient, error) { + kc := client{ + Client: &kafka.Client{ + Addr: kafka.TCP(cfg.Reader.Brokers...), + }, + } + + transport := &Transport{ + Transport: &kafka.Transport{ + MetadataTopics: cfg.getTopics(), + }, + } + if err := fillLayer(transport, cfg.SASL, cfg.TLS); err != nil { + return nil, fmt.Errorf("error when initializing kafka client for verify topic purpose %w", err) + } + + kc.Transport = transport + return &kc, nil +} + +func (k *client) GetClient() *kafka.Client { + return k.Client +} + +func verifyTopics(client kafkaClient, cfg *ConsumerConfig) (bool, error) { + topics := cfg.getTopics() + + metadata, err := client.Metadata(context.Background(), &kafka.MetadataRequest{ + Topics: topics, + }) + if err != nil { + return false, fmt.Errorf("error when during verifyTopics metadata request %w", err) + } + return checkTopicsWithinMetadata(metadata, topics) +} + +func checkTopicsWithinMetadata(metadata *kafka.MetadataResponse, topics []string) (bool, error) { + metadataTopics := make(map[string]struct{}, len(metadata.Topics)) + for _, topic := range metadata.Topics { + if topic.Error != nil { + continue + } + metadataTopics[topic.Name] = struct{}{} + } + + for _, topic := range topics { + if _, exist := metadataTopics[topic]; !exist { + return false, nil + } + } + return true, nil +} diff --git a/verify_topic_test.go b/verify_topic_test.go new file mode 100644 index 0000000..0ee1e94 --- /dev/null +++ b/verify_topic_test.go @@ -0,0 +1,130 @@ +package kafka + +import ( + "context" + "errors" + "testing" + + "github.com/segmentio/kafka-go" +) + +type mockKafkaClientWrapper struct { + wantErr bool + wantExistTopic bool +} + +func (m mockKafkaClientWrapper) GetClient() *kafka.Client { + return &kafka.Client{} +} + +func (m mockKafkaClientWrapper) Metadata(_ context.Context, _ *kafka.MetadataRequest) (*kafka.MetadataResponse, error) { + if m.wantErr { + return nil, errors.New("metadataReqErr") + } + + if !m.wantExistTopic { + return &kafka.MetadataResponse{ + Topics: []kafka.Topic{ + {Name: "topic1", Error: kafka.UnknownTopicOrPartition}, + {Name: "topic2", Error: nil}, + }, + }, nil + } + + return &kafka.MetadataResponse{ + Topics: []kafka.Topic{ + {Name: "topic1", Error: nil}, + {Name: "topic2", Error: nil}, + }, + }, nil +} + +func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) { + t.Run("Should_Return_Error_When_Metadata_Request_Has_Failed", func(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{wantErr: true} + cfg := &ConsumerConfig{} + + // When + _, err := verifyTopics(mockClient, cfg) + + // Then + if err == nil { + t.Error("metadata request must be failed!") + } + }) + t.Run("Should_Return_False_When_Given_Topic_Does_Not_Exist", func(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{wantExistTopic: false} + cfg := &ConsumerConfig{ + Reader: ReaderConfig{ + Topic: "topic1", + }, + } + + // When + exist, err := verifyTopics(mockClient, cfg) + + // Then + if exist { + t.Errorf("topic %s must not exist", cfg.Reader.Topic) + } + if err != nil { + t.Error("err must be nil") + } + }) + t.Run("Should_Return_True_When_Given_Topic_Exist", func(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{wantExistTopic: true} + cfg := &ConsumerConfig{ + Reader: ReaderConfig{ + Topic: "topic1", + }, + } + + // When + exist, err := verifyTopics(mockClient, cfg) + + // Then + if !exist { + t.Errorf("topic %s must exist", cfg.Reader.Topic) + } + if err != nil { + t.Error("err must be nil") + } + }) +} + +func Test_newKafkaClient(t *testing.T) { + // Given + cfg := &ConsumerConfig{ + Reader: ReaderConfig{ + Topic: "topic", + Brokers: []string{"127.0.0.1:9092"}, + }, + } + + // When + client, err := newKafkaClient(cfg) + + // Then + if client.GetClient().Addr.String() != "127.0.0.1:9092" { + t.Errorf("broker address must be 127.0.0.1:9092") + } + if err != nil { + t.Errorf("err must be nil") + } +} + +func Test_kClient_GetClient(t *testing.T) { + // Given + mockClient := mockKafkaClientWrapper{} + + // When + client := mockClient.GetClient() + + // Then + if client == nil { + t.Error("client must not be nil") + } +}