diff --git a/changeEventProducer/kafka/prometheus/mapping.yml b/changeEventProducer/kafka/prometheus/mapping.yml deleted file mode 100644 index aa85b7d3..00000000 --- a/changeEventProducer/kafka/prometheus/mapping.yml +++ /dev/null @@ -1,12 +0,0 @@ -mappings: - - match: "kafka.SuccessCount.*" - name: "kafka_success_count" - type: counter - labels: - application: "change_event_kafka_producer" - - match: "kafka.FailureCount.*" - name: "kafka_failure_count" - type: counter - labels: - application: "change_event_kafka_producer" - diff --git a/changeEventProducer/kafka/writer.go b/changeEventProducer/kafka/writer.go index 662db99a..8725235e 100644 --- a/changeEventProducer/kafka/writer.go +++ b/changeEventProducer/kafka/writer.go @@ -3,31 +3,37 @@ package kafka import ( "context" "fmt" - "github.com/cactus/go-statsd-client/v5/statsd" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/segmentio/kafka-go" "google.golang.org/protobuf/proto" - "log" - "time" ) -const ( - SuccessCountMetric = "kafka.SuccessCount" - FailureCountMetric = "kafka.FailureCount" +var ( + kafkaSuccess = promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_messages_success_count", + Help: "Success count of the successfully processed kafka messages", + }) + kafkaFailure = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "kafka_messages_failure_count", + Help: "Failure count of the kafka messages", + }, []string{"cause"}) ) type Writer struct { - kafkaWriter *kafka.Writer - statsdClient statsd.Statter + kafkaWriter *kafka.Writer } -func NewWriter(kafkaBrokerUrl string, timeout int, retries int, statsdClient statsd.Statter) *Writer { +func NewWriter(kafkaBrokerUrl string, timeout int, retries int) *Writer { writer := &kafka.Writer{ Addr: kafka.TCP(kafkaBrokerUrl), WriteTimeout: time.Second * time.Duration(timeout), MaxAttempts: retries, } - return &Writer{kafkaWriter: writer, statsdClient: statsdClient} + return &Writer{kafkaWriter: writer} } func (w *Writer) Close() error { @@ -50,15 +56,9 @@ func (w *Writer) Write(topic string, protoMessage proto.Message) error { func (w *Writer) send(message kafka.Message) error { err := w.kafkaWriter.WriteMessages(context.Background(), message) if err != nil { - metricsErr := w.statsdClient.Inc(FailureCountMetric, 1, 1) - if metricsErr != nil { - log.Printf("Failed to increase Failure metric - %s", err.Error()) - } + kafkaFailure.WithLabelValues("kafka_message_produce_error" + " " + err.Error()).Inc() return err } - err = w.statsdClient.Inc(SuccessCountMetric, 1, 1) - if err != nil { - log.Printf("Failed to increase Success metric - %s", err.Error()) - } + kafkaSuccess.Inc() return nil } diff --git a/config/config.go b/config/config.go index 33b2d9c5..ad194fad 100644 --- a/config/config.go +++ b/config/config.go @@ -27,12 +27,6 @@ type KafkaProducerConfig struct { Timeout int `default:"5000"` } -// StatsDConfig -type StatsDConfig struct { - Address string - Prefix string -} - // SchameChangeConfig type SchemaChangeConfig struct { KafkaTopic string @@ -50,6 +44,5 @@ type Config struct { NewRelic NewRelicConfig DB DBConfig KafkaProducer KafkaProducerConfig - StatsD StatsDConfig SchemaChange SchemaChangeConfig } diff --git a/config/config.yaml b/config/config.yaml index 949444b5..d6fc73c1 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -30,11 +30,6 @@ kafkaProducer: retries: 5 timeout: 5000 -# StatsD Config -statsd: - address: "localhost:9090" - prefix: "schema-change-event" - # Schema Change config schemachange: kafkatopic: "schema_change" diff --git a/go.mod b/go.mod index 8277d269..f63bff7e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( github.com/MakeNowJust/heredoc v1.0.0 github.com/alecthomas/chroma v0.8.2 - github.com/cactus/go-statsd-client/v5 v5.1.0 github.com/dgraph-io/ristretto v0.1.0 github.com/dustin/go-humanize v1.0.1 github.com/emicklei/dot v0.16.0 diff --git a/go.sum b/go.sum index 8138aeac..f33a216c 100644 --- a/go.sum +++ b/go.sum @@ -216,8 +216,6 @@ github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx2 github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= -github.com/cactus/go-statsd-client/v5 v5.1.0 h1:sbbdfIl9PgisjEoXzvXI1lwUKWElngsjJKaZeC021P4= -github.com/cactus/go-statsd-client/v5 v5.1.0/go.mod h1:COEvJ1E+/E2L4q6QE5CkjWPi4eeDw9maJBMIuMPBZbY= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= @@ -858,7 +856,6 @@ github.com/jackc/puddle v1.1.3 h1:JnPg/5Q9xVJGfjsO5CPUOjnJps1JaRUm8I9FXVCFK94= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jeremywohl/flatten v1.0.1 h1:LrsxmB3hfwJuE+ptGOijix1PIfOoKLJ3Uee/mzbgtrs= github.com/jeremywohl/flatten v1.0.1/go.mod h1:4AmD/VxjWcI5SRB0n6szE2A6s2fsNHDLO0nAlMHgfLQ= -github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= diff --git a/internal/server/server.go b/internal/server/server.go index 365066d3..782c261f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -13,8 +13,6 @@ import ( "github.com/goto/stencil/internal/prometheus" newRelic2 "github.com/goto/stencil/pkg/newrelic" - "github.com/cactus/go-statsd-client/v5/statsd" - "github.com/dgraph-io/ristretto" "github.com/goto/salt/spa" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -66,16 +64,11 @@ func Start(cfg config.Config) { changeDetectorService := changedetector.NewService(newRelic) - statsdConfig := &statsd.ClientConfig{ - Address: cfg.StatsD.Address, - Prefix: cfg.StatsD.Prefix, - } - fmt.Printf("Kafka Adress %s", cfg.KafkaProducer.BootstrapServer) - statsdClient, err := statsd.NewClientWithConfig(statsdConfig) + fmt.Printf("Kafka Address %s", cfg.KafkaProducer.BootstrapServer) if err != nil { log.Fatal("Error creating StatsD client:", err) } - producer := kafka.NewWriter(cfg.KafkaProducer.BootstrapServer, cfg.KafkaProducer.Timeout, cfg.KafkaProducer.Retries, statsdClient) + producer := kafka.NewWriter(cfg.KafkaProducer.BootstrapServer, cfg.KafkaProducer.Timeout, cfg.KafkaProducer.Retries) notificationEventRepo := postgres.NewNotificationEventRepository(db)