Skip to content

Commit

Permalink
chore: remove statsd and use prometheus (#50)
Browse files Browse the repository at this point in the history
Co-authored-by: gagan.dhand <[email protected]>
  • Loading branch information
gdgagan696 and gdgagangeek authored Jul 18, 2024
1 parent 7b91894 commit 2fc48b8
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 55 deletions.
12 changes: 0 additions & 12 deletions changeEventProducer/kafka/prometheus/mapping.yml

This file was deleted.

36 changes: 18 additions & 18 deletions changeEventProducer/kafka/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,37 @@ package kafka
import (
"context"
"fmt"
"github.com/cactus/go-statsd-client/v5/statsd"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/segmentio/kafka-go"
"google.golang.org/protobuf/proto"
"log"
"time"
)

const (
SuccessCountMetric = "kafka.SuccessCount"
FailureCountMetric = "kafka.FailureCount"
var (
kafkaSuccess = promauto.NewCounter(prometheus.CounterOpts{
Name: "kafka_messages_success_count",
Help: "Success count of the successfully processed kafka messages",
})
kafkaFailure = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kafka_messages_failure_count",
Help: "Failure count of the kafka messages",
}, []string{"cause"})
)

type Writer struct {
kafkaWriter *kafka.Writer
statsdClient statsd.Statter
kafkaWriter *kafka.Writer
}

func NewWriter(kafkaBrokerUrl string, timeout int, retries int, statsdClient statsd.Statter) *Writer {
func NewWriter(kafkaBrokerUrl string, timeout int, retries int) *Writer {
writer := &kafka.Writer{
Addr: kafka.TCP(kafkaBrokerUrl),
WriteTimeout: time.Second * time.Duration(timeout),
MaxAttempts: retries,
}

return &Writer{kafkaWriter: writer, statsdClient: statsdClient}
return &Writer{kafkaWriter: writer}
}

func (w *Writer) Close() error {
Expand All @@ -50,15 +56,9 @@ func (w *Writer) Write(topic string, protoMessage proto.Message) error {
func (w *Writer) send(message kafka.Message) error {
err := w.kafkaWriter.WriteMessages(context.Background(), message)
if err != nil {
metricsErr := w.statsdClient.Inc(FailureCountMetric, 1, 1)
if metricsErr != nil {
log.Printf("Failed to increase Failure metric - %s", err.Error())
}
kafkaFailure.WithLabelValues("kafka_message_produce_error" + " " + err.Error()).Inc()
return err
}
err = w.statsdClient.Inc(SuccessCountMetric, 1, 1)
if err != nil {
log.Printf("Failed to increase Success metric - %s", err.Error())
}
kafkaSuccess.Inc()
return nil
}
7 changes: 0 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ type KafkaProducerConfig struct {
Timeout int `default:"5000"`
}

// StatsDConfig
type StatsDConfig struct {
Address string
Prefix string
}

// SchameChangeConfig
type SchemaChangeConfig struct {
KafkaTopic string
Expand All @@ -50,6 +44,5 @@ type Config struct {
NewRelic NewRelicConfig
DB DBConfig
KafkaProducer KafkaProducerConfig
StatsD StatsDConfig
SchemaChange SchemaChangeConfig
}
5 changes: 0 additions & 5 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ kafkaProducer:
retries: 5
timeout: 5000

# StatsD Config
statsd:
address: "localhost:9090"
prefix: "schema-change-event"

# Schema Change config
schemachange:
kafkatopic: "schema_change"
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
github.com/MakeNowJust/heredoc v1.0.0
github.com/alecthomas/chroma v0.8.2
github.com/cactus/go-statsd-client/v5 v5.1.0
github.com/dgraph-io/ristretto v0.1.0
github.com/dustin/go-humanize v1.0.1
github.com/emicklei/dot v0.16.0
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx2
github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50=
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/cactus/go-statsd-client/v5 v5.1.0 h1:sbbdfIl9PgisjEoXzvXI1lwUKWElngsjJKaZeC021P4=
github.com/cactus/go-statsd-client/v5 v5.1.0/go.mod h1:COEvJ1E+/E2L4q6QE5CkjWPi4eeDw9maJBMIuMPBZbY=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
Expand Down Expand Up @@ -858,7 +856,6 @@ github.com/jackc/puddle v1.1.3 h1:JnPg/5Q9xVJGfjsO5CPUOjnJps1JaRUm8I9FXVCFK94=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jeremywohl/flatten v1.0.1 h1:LrsxmB3hfwJuE+ptGOijix1PIfOoKLJ3Uee/mzbgtrs=
github.com/jeremywohl/flatten v1.0.1/go.mod h1:4AmD/VxjWcI5SRB0n6szE2A6s2fsNHDLO0nAlMHgfLQ=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
Expand Down
11 changes: 2 additions & 9 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"github.com/goto/stencil/internal/prometheus"
newRelic2 "github.com/goto/stencil/pkg/newrelic"

"github.com/cactus/go-statsd-client/v5/statsd"

"github.com/dgraph-io/ristretto"
"github.com/goto/salt/spa"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
Expand Down Expand Up @@ -66,16 +64,11 @@ func Start(cfg config.Config) {

changeDetectorService := changedetector.NewService(newRelic)

statsdConfig := &statsd.ClientConfig{
Address: cfg.StatsD.Address,
Prefix: cfg.StatsD.Prefix,
}
fmt.Printf("Kafka Adress %s", cfg.KafkaProducer.BootstrapServer)
statsdClient, err := statsd.NewClientWithConfig(statsdConfig)
fmt.Printf("Kafka Address %s", cfg.KafkaProducer.BootstrapServer)
if err != nil {
log.Fatal("Error creating StatsD client:", err)
}
producer := kafka.NewWriter(cfg.KafkaProducer.BootstrapServer, cfg.KafkaProducer.Timeout, cfg.KafkaProducer.Retries, statsdClient)
producer := kafka.NewWriter(cfg.KafkaProducer.BootstrapServer, cfg.KafkaProducer.Timeout, cfg.KafkaProducer.Retries)

notificationEventRepo := postgres.NewNotificationEventRepository(db)

Expand Down

0 comments on commit 2fc48b8

Please sign in to comment.