Skip to content

Commit

Permalink
docs: example and readme update
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Jul 7, 2023
1 parent 262bed3 commit 51c1142
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 50 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ You can find a number of ready-to-run examples at [this directory](examples).
After running `docker-compose up` command, you can run any application you want.

<details>
<summary>Without Retry/Exception Manager</summary>
<summary>Simple Consumer</summary>

func main() {
consumerCfg := &kafka.ConsumerConfig{
Expand All @@ -49,7 +49,7 @@ After running `docker-compose up` command, you can run any application you want.
</details>

<details>
<summary>With Retry/Exception Option Enabled</summary>
<summary>Simple Consumer With Retry/Exception Option</summary>

func main() {
consumerCfg := &kafka.ConsumerConfig{
Expand Down
63 changes: 19 additions & 44 deletions examples/with-grafana/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"strconv"
"sync/atomic"
"time"

"github.com/Trendyol/kafka-konsumer"
Expand All @@ -27,47 +26,31 @@ var messages = []user{
}

func main() {
// retryMap stores the number of retries for each message
var retryMap = make(map[int]int, len(messages))
for _, message := range messages {
retryMap[message.ID] = 0
}

// create new kafka producer
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
defer producer.Close()

// produce messages at 1 seconds interval
ticker := time.NewTicker(1 * time.Second)
quit := make(chan struct{})
go func() {
// to find the message we will produce at the next interval
var i uint64
for {
select {
case <-ticker.C:
message := messages[atomic.LoadUint64(&i)]
bytes, _ := json.Marshal(message)

_ = producer.Produce(context.Background(), kafka.Message{
Topic: "konsumer",
Key: []byte(strconv.Itoa(message.ID)),
Value: bytes,
})

if message.ID == messages[len(messages)-1].ID {
quit <- struct{}{}
return
}

atomic.AddUint64(&i, 1)
case <-quit:
ticker.Stop()
return
// produce messages at 1 seconds interval
i := 0
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
if i == len(messages) {
break
}
message := messages[i]
bytes, _ := json.Marshal(message)

_ = producer.Produce(context.Background(), kafka.Message{
Topic: "konsumer",
Key: []byte(strconv.Itoa(message.ID)),
Value: bytes,
})
i++
}
}()

Expand All @@ -88,16 +71,8 @@ func main() {
MaxRetry: 3,
},
ConsumeFn: func(message kafka.Message) error {
u := &user{}
if err := json.Unmarshal(message.Value, u); err != nil {
return err
}

n := retryMap[u.ID]
if n < 3 {
retryMap[u.ID] += 1
return fmt.Errorf("message %s retrying, current retry count: %d", message.Key, n)
}
// mocking some background task
time.Sleep(1 * time.Second)

fmt.Printf("Message from %s with value %s is consumed successfully\n", message.Topic, string(message.Value))
return nil
Expand All @@ -113,7 +88,7 @@ func main() {

fmt.Println("Consumer started!")

// wait for interrupt signal to gracefully shutdown the consumer
// wait for interrupt signal to gracefully shut down the consumer
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
Expand Down
8 changes: 4 additions & 4 deletions examples/with-grafana/prometheus/alerts.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
groups:
- name: konsumer-alerts
rules:
- alert: ProcessedMessageDecreasing
expr: sum(rate(kafka_konsumer_processed_messages_total{job="konsumer"}[1m])) < 0.1
- alert: UnprocessedMessageIncreasing
expr: increase(kafka_konsumer_unprocessed_messages_total{job="konsumer"}[5m]) > 0
for: 5m
labels:
severity: "critical"
annotations:
summary: "Kafka Konsumer processed message decreasing"
description: "Kafka Konsumer processed message decreasing, current rate: {{ $value }} <http://localhost:3000/d/DlIdtG_4z/kafka-konsumer-dashboard|Grafana>"
summary: "Kafka Konsumer unprocessed message increasing"
description: "Kafka Konsumer unprocessed message increasing, current value: {{ $value }} <http://localhost:3000/d/DlIdtG_4z/kafka-konsumer-dashboard|Grafana>"

0 comments on commit 51c1142

Please sign in to comment.