From 51c11428330dfbfc0c89afb65d14a8656d2ce9b8 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Fri, 7 Jul 2023 09:57:46 +0300 Subject: [PATCH] docs: example and readme update --- README.md | 4 +- examples/with-grafana/main.go | 63 +++++++-------------- examples/with-grafana/prometheus/alerts.yml | 8 +-- 3 files changed, 25 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index 9133104..f8ba462 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ You can find a number of ready-to-run examples at [this directory](examples). After running `docker-compose up` command, you can run any application you want.
- Without Retry/Exception Manager + Simple Consumer func main() { consumerCfg := &kafka.ConsumerConfig{ @@ -49,7 +49,7 @@ After running `docker-compose up` command, you can run any application you want.
- With Retry/Exception Option Enabled + Simple Consumer With Retry/Exception Option func main() { consumerCfg := &kafka.ConsumerConfig{ diff --git a/examples/with-grafana/main.go b/examples/with-grafana/main.go index 56afab5..17a299b 100644 --- a/examples/with-grafana/main.go +++ b/examples/with-grafana/main.go @@ -7,7 +7,6 @@ import ( "os" "os/signal" "strconv" - "sync/atomic" "time" "github.com/Trendyol/kafka-konsumer" @@ -27,47 +26,31 @@ var messages = []user{ } func main() { - // retryMap stores the number of retries for each message - var retryMap = make(map[int]int, len(messages)) - for _, message := range messages { - retryMap[message.ID] = 0 - } - // create new kafka producer producer, _ := kafka.NewProducer(kafka.ProducerConfig{ Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, }) + defer producer.Close() - // produce messages at 1 seconds interval - ticker := time.NewTicker(1 * time.Second) - quit := make(chan struct{}) go func() { - // to find the message we will produce at the next interval - var i uint64 - for { - select { - case <-ticker.C: - message := messages[atomic.LoadUint64(&i)] - bytes, _ := json.Marshal(message) - - _ = producer.Produce(context.Background(), kafka.Message{ - Topic: "konsumer", - Key: []byte(strconv.Itoa(message.ID)), - Value: bytes, - }) - - if message.ID == messages[len(messages)-1].ID { - quit <- struct{}{} - return - } - - atomic.AddUint64(&i, 1) - case <-quit: - ticker.Stop() - return + // produce messages at 1 seconds interval + i := 0 + ticker := time.NewTicker(1 * time.Second) + for range ticker.C { + if i == len(messages) { + break } + message := messages[i] + bytes, _ := json.Marshal(message) + + _ = producer.Produce(context.Background(), kafka.Message{ + Topic: "konsumer", + Key: []byte(strconv.Itoa(message.ID)), + Value: bytes, + }) + i++ } }() @@ -88,16 +71,8 @@ func main() { MaxRetry: 3, }, ConsumeFn: func(message kafka.Message) error { - u := &user{} - if err := json.Unmarshal(message.Value, u); err != nil { - return err - } - - n := retryMap[u.ID] - if n < 3 { - retryMap[u.ID] += 1 - return fmt.Errorf("message %s retrying, current retry count: %d", message.Key, n) - } + // mocking some background task + time.Sleep(1 * time.Second) fmt.Printf("Message from %s with value %s is consumed successfully\n", message.Topic, string(message.Value)) return nil @@ -113,7 +88,7 @@ func main() { fmt.Println("Consumer started!") - // wait for interrupt signal to gracefully shutdown the consumer + // wait for interrupt signal to gracefully shut down the consumer c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) <-c diff --git a/examples/with-grafana/prometheus/alerts.yml b/examples/with-grafana/prometheus/alerts.yml index cad1ba6..853050c 100644 --- a/examples/with-grafana/prometheus/alerts.yml +++ b/examples/with-grafana/prometheus/alerts.yml @@ -1,11 +1,11 @@ groups: - name: konsumer-alerts rules: - - alert: ProcessedMessageDecreasing - expr: sum(rate(kafka_konsumer_processed_messages_total{job="konsumer"}[1m])) < 0.1 + - alert: UnprocessedMessageIncreasing + expr: increase(kafka_konsumer_unprocessed_messages_total{job="konsumer"}[5m]) > 0 for: 5m labels: severity: "critical" annotations: - summary: "Kafka Konsumer processed message decreasing" - description: "Kafka Konsumer processed message decreasing, current rate: {{ $value }} " + summary: "Kafka Konsumer unprocessed message increasing" + description: "Kafka Konsumer unprocessed message increasing, current value: {{ $value }} "