diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..efffe1e --- /dev/null +++ b/Makefile @@ -0,0 +1,31 @@ +## help: print this help message +help: + @echo "Usage:" + @sed -n 's/^##//p' ${MAKEFILE_LIST} | column -t -s ":" | sed -e 's/^/ /' + +## lint: runs golangci lint based on .golangci.yml configuration +.PHONY: lint +lint: + @if ! test -f `go env GOPATH`/bin/golangci-lint; then go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.0; fi + golangci-lint run -c .golangci.yml --fix -v + +## test: runs tests +.PHONY: test +test: + go test -v ./... -coverprofile=unit_coverage.out -short + +## unit-coverage-html: extract unit tests coverage to html format +.PHONY: unit-coverage-html +unit-coverage-html: + make test + go tool cover -html=unit_coverage.out -o unit_coverage.html + +## godoc: generate documentation +.PHONY: godoc +godoc: + @if ! test -f `go env GOPATH`/bin/godoc; then go install golang.org/x/tools/cmd/godoc; fi + godoc -http=127.0.0.1:6060 + +.PHONY: integration-compose +integration-compose: + docker compose -f test/integration/docker-compose.yml up --wait --build --force-recreate --remove-orphans diff --git a/api.go b/api.go index cdf7e0c..6ccc83b 100644 --- a/api.go +++ b/api.go @@ -3,9 +3,9 @@ package kafka import ( "fmt" - "github.com/gofiber/adaptor/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/gofiber/fiber/v2" - "github.com/prometheus/client_golang/prometheus/promhttp" ) type API interface { @@ -19,7 +19,7 @@ type server struct { logger LoggerInterface } -func NewAPI(cfg *ConsumerConfig) API { +func NewAPI(cfg *ConsumerConfig, consumer Consumer, metricCollectors ...prometheus.Collector) API { setDefaults(cfg) svr := server{ @@ -34,7 +34,14 @@ func NewAPI(cfg *ConsumerConfig) API { ), } - svr.fiber.Get(*cfg.MetricConfiguration.Path, adaptor.HTTPHandler(promhttp.Handler())) + metricMiddleware, err := NewMetricMiddleware(cfg, svr.fiber, consumer, metricCollectors...) + + if err == nil { + svr.fiber.Use(metricMiddleware) + } else { + svr.logger.Errorf("metric middleware cannot be initialized: %v", err) + } + svr.fiber.Get(*cfg.APIConfiguration.HealthCheckPath, svr.HealthCheckHandler) return &svr diff --git a/batch_consumer.go b/batch_consumer.go index ebc2416..cc5d669 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -5,12 +5,12 @@ import ( "time" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" - "github.com/Trendyol/kafka-konsumer/instrumentation" "github.com/segmentio/kafka-go" ) type batchConsumer struct { *base + metric *ConsumerMetric consumeFn func([]Message) error @@ -38,7 +38,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { } if cfg.APIEnabled { - c.base.setupAPI(cfg) + c.base.setupAPI(cfg, &c, c.base.cronsumer.GetMetricCollectors()...) } return &c, nil @@ -54,6 +54,10 @@ func (b *batchConsumer) Consume() { } } +func (b *batchConsumer) GetMetric() *ConsumerMetric { + return b.metric +} + func (b *batchConsumer) startBatch() { defer b.wg.Done() @@ -111,10 +115,10 @@ func (b *batchConsumer) process(messages []Message) { commitErr := b.r.CommitMessages(context.Background(), segmentioMessages...) if commitErr != nil { - instrumentation.TotalUnprocessedBatchMessagesCounter.Inc() + b.metric.TotalUnprocessedBatchMessagesCounter++ b.logger.Error("Error Committing messages %s", commitErr.Error()) return } - instrumentation.TotalProcessedBatchMessagesCounter.Inc() + b.metric.TotalProcessedBatchMessagesCounter++ } diff --git a/collector.go b/collector.go new file mode 100644 index 0000000..4e49a81 --- /dev/null +++ b/collector.go @@ -0,0 +1,98 @@ +package kafka + +import ( + "github.com/ansrivas/fiberprometheus/v2" + "github.com/gofiber/fiber/v2" + "github.com/prometheus/client_golang/prometheus" +) + +const Name = "kafka_konsumer_" + +type metricCollector struct { + consumer Consumer + + totalUnprocessedMessagesCounter *prometheus.Desc + totalProcessedMessagesCounter *prometheus.Desc + totalUnprocessedBatchMessagesCounter *prometheus.Desc + totalProcessedBatchMessagesCounter *prometheus.Desc +} + +func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(s, ch) +} + +func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { + consumerMetric := s.consumer.GetMetric() + + ch <- prometheus.MustNewConstMetric( + s.totalProcessedMessagesCounter, + prometheus.CounterValue, + float64(consumerMetric.TotalProcessedMessagesCounter), + []string{}..., + ) + + ch <- prometheus.MustNewConstMetric( + s.totalUnprocessedMessagesCounter, + prometheus.CounterValue, + float64(consumerMetric.TotalUnprocessedMessagesCounter), + []string{}..., + ) + ch <- prometheus.MustNewConstMetric( + s.totalProcessedBatchMessagesCounter, + prometheus.CounterValue, + float64(consumerMetric.TotalProcessedBatchMessagesCounter), + []string{}..., + ) + + ch <- prometheus.MustNewConstMetric( + s.totalUnprocessedBatchMessagesCounter, + prometheus.CounterValue, + float64(consumerMetric.TotalUnprocessedBatchMessagesCounter), + []string{}..., + ) +} + +func newMetricCollector(consumer Consumer) *metricCollector { + return &metricCollector{ + consumer: consumer, + + totalProcessedMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(Name, "processed_messages_total", "current"), + "Total number of processed messages.", + []string{}, + nil, + ), + totalUnprocessedMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"), + "Total number of unprocessed messages.", + []string{}, + nil, + ), + totalProcessedBatchMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(Name, "processed_batch_messages_total", "current"), + "Total number of processed batch messages.", + []string{}, + nil, + ), + totalUnprocessedBatchMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(Name, "unprocessed_batch_messages_total", "current"), + "Total number of unprocessed batch messages.", + []string{}, + nil, + ), + } +} + +func NewMetricMiddleware(cfg *ConsumerConfig, + app *fiber.App, + consumer Consumer, + metricCollectors ...prometheus.Collector, +) (func(ctx *fiber.Ctx) error, error) { + prometheus.DefaultRegisterer.MustRegister(newMetricCollector(consumer)) + prometheus.DefaultRegisterer.MustRegister(metricCollectors...) + + fiberPrometheus := fiberprometheus.New(cfg.Reader.GroupID) + fiberPrometheus.RegisterAt(app, *cfg.MetricConfiguration.Path) + + return fiberPrometheus.Middleware, nil +} diff --git a/consumer.go b/consumer.go index 4925a12..f27bb64 100644 --- a/consumer.go +++ b/consumer.go @@ -4,12 +4,12 @@ import ( "context" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" - "github.com/Trendyol/kafka-konsumer/instrumentation" "github.com/segmentio/kafka-go" ) type consumer struct { *base + metric *ConsumerMetric consumeFn func(Message) error } @@ -22,6 +22,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { c := consumer{ base: consumerBase, + metric: &ConsumerMetric{}, consumeFn: cfg.ConsumeFn, } @@ -32,7 +33,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { } if cfg.APIEnabled { - c.base.setupAPI(cfg) + c.base.setupAPI(cfg, &c, c.base.cronsumer.GetMetricCollectors()...) } return &c, nil @@ -54,6 +55,10 @@ func (c *consumer) Consume() { } } +func (c *consumer) GetMetric() *ConsumerMetric { + return c.metric +} + func (c *consumer) process(message Message) { consumeErr := c.consumeFn(message) @@ -74,10 +79,10 @@ func (c *consumer) process(message Message) { commitErr := c.r.CommitMessages(context.Background(), kafka.Message(message)) if commitErr != nil { - instrumentation.TotalUnprocessedMessagesCounter.Inc() + c.metric.TotalUnprocessedMessagesCounter++ c.logger.Errorf("Error Committing message %s, %s", string(message.Value), commitErr.Error()) return } - instrumentation.TotalProcessedMessagesCounter.Inc() + c.metric.TotalProcessedMessagesCounter++ } diff --git a/consumer_base.go b/consumer_base.go index e6e9105..0b1e61a 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "github.com/prometheus/client_golang/prometheus" + cronsumer "github.com/Trendyol/kafka-cronsumer" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" "github.com/segmentio/kafka-go" @@ -13,6 +15,7 @@ type Consumer interface { Consume() WithLogger(logger LoggerInterface) Stop() error + GetMetric() *ConsumerMetric } type base struct { @@ -71,9 +74,9 @@ func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Messa c.subprocesses.Add(c.cronsumer) } -func (c *base) setupAPI(cfg *ConsumerConfig) { +func (c *base) setupAPI(cfg *ConsumerConfig, consumer Consumer, metricCollectors ...prometheus.Collector) { c.logger.Debug("Initializing API") - c.api = NewAPI(cfg) + c.api = NewAPI(cfg, consumer, metricCollectors...) c.subprocesses.Add(c.api) } diff --git a/go.mod b/go.mod index 09b80c4..9ecd78b 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/Trendyol/kafka-konsumer go 1.19 require ( - github.com/Trendyol/kafka-cronsumer v1.2.1 - github.com/gofiber/adaptor/v2 v2.2.1 - github.com/gofiber/fiber/v2 v2.44.0 - github.com/prometheus/client_golang v1.15.1 + github.com/Trendyol/kafka-cronsumer v1.2.2 + github.com/ansrivas/fiberprometheus/v2 v2.6.1 + github.com/gofiber/fiber/v2 v2.48.0 + github.com/prometheus/client_golang v1.16.0 github.com/segmentio/kafka-go v0.4.40 go.uber.org/zap v1.24.0 ) @@ -15,33 +15,29 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/gofiber/adaptor/v2 v2.2.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.0 // indirect - github.com/klauspost/compress v1.16.5 // indirect + github.com/klauspost/compress v1.16.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.18 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/philhofer/fwd v1.1.2 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.42.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect + github.com/prometheus/client_model v0.4.0 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect - github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 // indirect - github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect - github.com/tinylib/msgp v1.1.8 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.47.0 // indirect + github.com/valyala/fasthttp v1.48.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.8.0 // indirect + golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/go.sum b/go.sum index 271a356..2a5b33b 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ -github.com/Trendyol/kafka-cronsumer v1.2.1 h1:BC/YDRE1W7OfDCSk4KD8pRyVSLKR929zvqi5gNGq3Mc= -github.com/Trendyol/kafka-cronsumer v1.2.1/go.mod h1:GzV1DUvjUTco+Qk4zR2GLfWblFszKTuYY9Epx8d7ROM= +github.com/Trendyol/kafka-cronsumer v1.2.2 h1:Pxyd5xVZYmwkPGDfqzWhbcwaE5TFzipz2yxaqwEwEq8= +github.com/Trendyol/kafka-cronsumer v1.2.2/go.mod h1:7/eJ5jRqtDewE7dXlAoAWyDN5OEcskXJoyCS25bf+bM= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= +github.com/ansrivas/fiberprometheus/v2 v2.6.1/go.mod h1:MloIKvy4yN6hVqlRpJ/jDiR244YnWJaQC0FIqS8A+MY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -12,10 +14,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9Lv4= github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= -github.com/gofiber/fiber/v2 v2.44.0 h1:Z90bEvPcJM5GFJnu1py0E1ojoerkyew3iiNJ78MQCM8= -github.com/gofiber/fiber/v2 v2.44.0/go.mod h1:VTMtb/au8g01iqvHyaCzftuM/xmZgKOZCtFzz6CdV9w= +github.com/gofiber/fiber/v2 v2.48.0 h1:cRVMCb9aUJDsyHxGFLwz/sGzDggdailZZyptU9F9cU0= +github.com/gofiber/fiber/v2 v2.48.0/go.mod h1:xqJgfqrc23FJuqGOW6DVgi3HyZEm2Mn9pRqUb2kHSX8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= @@ -24,44 +25,36 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= -github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= +github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= -github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= -github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= -github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= -github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= -github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= -github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= -github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= -github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= +github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 h1:rmMl4fXJhKMNWl+K+r/fq4FbbKI+Ia2m9hYBLm2h4G4= -github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94/go.mod h1:90zrgN3D/WJsDd1iXHT96alCoN2KJo6/4x1DZC3wZs8= -github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4= -github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= -github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= github.com/segmentio/kafka-go v0.4.40 h1:sszW7c0/uyv7+VcTW5trx2ZC7kMWDTxuR/6Zn8U1bm8= github.com/segmentio/kafka-go v0.4.40/go.mod h1:naFEZc5MQKdeL3W6NkZIAn48Y6AazqjRFDhnXeg3h94= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -69,13 +62,10 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= -github.com/tinylib/msgp v1.1.8 h1:FCXC1xanKO4I8plpHGH2P7koL/RzZs12l/+r7vakfm0= -github.com/tinylib/msgp v1.1.8/go.mod h1:qkpG+2ldGg4xRFmx+jfTvZPxfGFhi64BcnL9vkCm/Tw= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.47.0 h1:y7moDoxYzMooFpT5aHgNgVOQDrS3qlkfiP9mDtGGK9c= -github.com/valyala/fasthttp v1.47.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79Tc= +github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -84,7 +74,6 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -94,56 +83,37 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= diff --git a/instrumentation/counter.go b/instrumentation/counter.go deleted file mode 100644 index 89c4ca1..0000000 --- a/instrumentation/counter.go +++ /dev/null @@ -1,34 +0,0 @@ -package instrumentation - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var TotalProcessedMessagesCounter = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "kafka_konsumer_processed_messages_total", - Help: "Total number of processed messages.", - }, -) - -var TotalUnprocessedMessagesCounter = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "kafka_konsumer_unprocessed_messages_total", - Help: "Total number of unprocessed messages.", - }, -) - -var TotalProcessedBatchMessagesCounter = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "kafka_konsumer_processed_batch_messages_total", - Help: "Total number of processed batch messages.", - }, -) - -var TotalUnprocessedBatchMessagesCounter = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "kafka_konsumer_unprocessed_batch_messages_total", - Help: "Total number of unprocessed batch messages.", - }, -) diff --git a/metric.go b/metric.go new file mode 100644 index 0000000..6fabe6f --- /dev/null +++ b/metric.go @@ -0,0 +1,8 @@ +package kafka + +type ConsumerMetric struct { + TotalUnprocessedMessagesCounter int64 + TotalProcessedMessagesCounter int64 + TotalUnprocessedBatchMessagesCounter int64 + TotalProcessedBatchMessagesCounter int64 +}