Skip to content

Commit

Permalink
feat: integrate cronsumer metric collector to konsumer (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzyildirim authored Aug 20, 2023
1 parent cf9bdfe commit 68f6651
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 116 deletions.
31 changes: 31 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
## help: print this help message
help:
@echo "Usage:"
@sed -n 's/^##//p' ${MAKEFILE_LIST} | column -t -s ":" | sed -e 's/^/ /'

## lint: runs golangci lint based on .golangci.yml configuration
.PHONY: lint
lint:
@if ! test -f `go env GOPATH`/bin/golangci-lint; then go install github.com/golangci/golangci-lint/cmd/[email protected]; fi
golangci-lint run -c .golangci.yml --fix -v

## test: runs tests
.PHONY: test
test:
go test -v ./... -coverprofile=unit_coverage.out -short

## unit-coverage-html: extract unit tests coverage to html format
.PHONY: unit-coverage-html
unit-coverage-html:
make test
go tool cover -html=unit_coverage.out -o unit_coverage.html

## godoc: generate documentation
.PHONY: godoc
godoc:
@if ! test -f `go env GOPATH`/bin/godoc; then go install golang.org/x/tools/cmd/godoc; fi
godoc -http=127.0.0.1:6060

.PHONY: integration-compose
integration-compose:
docker compose -f test/integration/docker-compose.yml up --wait --build --force-recreate --remove-orphans
15 changes: 11 additions & 4 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package kafka
import (
"fmt"

"github.com/gofiber/adaptor/v2"
"github.com/prometheus/client_golang/prometheus"

"github.com/gofiber/fiber/v2"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type API interface {
Expand All @@ -19,7 +19,7 @@ type server struct {
logger LoggerInterface
}

func NewAPI(cfg *ConsumerConfig) API {
func NewAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) API {
setDefaults(cfg)

svr := server{
Expand All @@ -34,7 +34,14 @@ func NewAPI(cfg *ConsumerConfig) API {
),
}

svr.fiber.Get(*cfg.MetricConfiguration.Path, adaptor.HTTPHandler(promhttp.Handler()))
metricMiddleware, err := NewMetricMiddleware(cfg, svr.fiber, consumerMetric, metricCollectors...)

if err == nil {
svr.fiber.Use(metricMiddleware)
} else {
svr.logger.Errorf("metric middleware cannot be initialized: %v", err)
}

svr.fiber.Get(*cfg.APIConfiguration.HealthCheckPath, svr.HealthCheckHandler)

return &svr
Expand Down
11 changes: 7 additions & 4 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/Trendyol/kafka-konsumer/instrumentation"
"github.com/segmentio/kafka-go"
)

Expand Down Expand Up @@ -38,7 +37,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
}

if cfg.APIEnabled {
c.base.setupAPI(cfg)
c.base.setupAPI(cfg, c.metric, c.base.cronsumer.GetMetricCollectors()...)
}

return &c, nil
Expand All @@ -54,6 +53,10 @@ func (b *batchConsumer) Consume() {
}
}

func (b *batchConsumer) GetMetric() *ConsumerMetric {
return b.metric
}

func (b *batchConsumer) startBatch() {
defer b.wg.Done()

Expand Down Expand Up @@ -111,10 +114,10 @@ func (b *batchConsumer) process(messages []Message) {

commitErr := b.r.CommitMessages(context.Background(), segmentioMessages...)
if commitErr != nil {
instrumentation.TotalUnprocessedBatchMessagesCounter.Inc()
b.metric.TotalUnprocessedBatchMessagesCounter++
b.logger.Error("Error Committing messages %s", commitErr.Error())
return
}

instrumentation.TotalProcessedBatchMessagesCounter.Inc()
b.metric.TotalProcessedBatchMessagesCounter++
}
96 changes: 96 additions & 0 deletions collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package kafka

import (
"github.com/ansrivas/fiberprometheus/v2"
"github.com/gofiber/fiber/v2"
"github.com/prometheus/client_golang/prometheus"
)

const Name = "kafka_konsumer_"

type metricCollector struct {
consumerMetric *ConsumerMetric

totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
totalUnprocessedBatchMessagesCounter *prometheus.Desc
totalProcessedBatchMessagesCounter *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(s, ch)
}

func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
s.totalProcessedMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalProcessedMessagesCounter),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.totalUnprocessedMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
ch <- prometheus.MustNewConstMetric(
s.totalProcessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalProcessedBatchMessagesCounter),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.totalUnprocessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalUnprocessedBatchMessagesCounter),
[]string{}...,
)
}

func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
return &metricCollector{
consumerMetric: consumerMetric,

totalProcessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_messages_total", "current"),
"Total number of processed messages.",
[]string{},
nil,
),
totalUnprocessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
[]string{},
nil,
),
totalProcessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_batch_messages_total", "current"),
"Total number of processed batch messages.",
[]string{},
nil,
),
totalUnprocessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_batch_messages_total", "current"),
"Total number of unprocessed batch messages.",
[]string{},
nil,
),
}
}

func NewMetricMiddleware(cfg *ConsumerConfig,
app *fiber.App,
consumerMetric *ConsumerMetric,
metricCollectors ...prometheus.Collector,
) (func(ctx *fiber.Ctx) error, error) {
prometheus.DefaultRegisterer.MustRegister(newMetricCollector(consumerMetric))
prometheus.DefaultRegisterer.MustRegister(metricCollectors...)

fiberPrometheus := fiberprometheus.New(cfg.Reader.GroupID)
fiberPrometheus.RegisterAt(app, *cfg.MetricConfiguration.Path)

return fiberPrometheus.Middleware, nil
}
7 changes: 3 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/Trendyol/kafka-konsumer/instrumentation"
"github.com/segmentio/kafka-go"
)

Expand Down Expand Up @@ -32,7 +31,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
}

if cfg.APIEnabled {
c.base.setupAPI(cfg)
c.base.setupAPI(cfg, c.metric, c.base.cronsumer.GetMetricCollectors()...)
}

return &c, nil
Expand Down Expand Up @@ -74,10 +73,10 @@ func (c *consumer) process(message Message) {

commitErr := c.r.CommitMessages(context.Background(), kafka.Message(message))
if commitErr != nil {
instrumentation.TotalUnprocessedMessagesCounter.Inc()
c.metric.TotalUnprocessedMessagesCounter++
c.logger.Errorf("Error Committing message %s, %s", string(message.Value), commitErr.Error())
return
}

instrumentation.TotalProcessedMessagesCounter.Inc()
c.metric.TotalProcessedMessagesCounter++
}
8 changes: 6 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"github.com/prometheus/client_golang/prometheus"

cronsumer "github.com/Trendyol/kafka-cronsumer"
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/segmentio/kafka-go"
Expand All @@ -19,6 +21,7 @@ type base struct {
cronsumer kcronsumer.Cronsumer
api API
logger LoggerInterface
metric *ConsumerMetric
context context.Context
messageCh chan Message
quit chan struct{}
Expand Down Expand Up @@ -50,6 +53,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) {
}

c := base{
metric: &ConsumerMetric{},
messageCh: make(chan Message, cfg.Concurrency),
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
Expand All @@ -71,9 +75,9 @@ func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Messa
c.subprocesses.Add(c.cronsumer)
}

func (c *base) setupAPI(cfg *ConsumerConfig) {
func (c *base) setupAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) {
c.logger.Debug("Initializing API")
c.api = NewAPI(cfg)
c.api = NewAPI(cfg, consumerMetric, metricCollectors...)
c.subprocesses.Add(c.api)
}

Expand Down
28 changes: 12 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ module github.com/Trendyol/kafka-konsumer
go 1.19

require (
github.com/Trendyol/kafka-cronsumer v1.2.1
github.com/gofiber/adaptor/v2 v2.2.1
github.com/gofiber/fiber/v2 v2.44.0
github.com/prometheus/client_golang v1.15.1
github.com/Trendyol/kafka-cronsumer v1.2.2
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.48.0
github.com/prometheus/client_golang v1.16.0
github.com/segmentio/kafka-go v0.4.40
go.uber.org/zap v1.24.0
)
Expand All @@ -15,33 +15,29 @@ require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/gofiber/adaptor/v2 v2.2.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/tinylib/msgp v1.1.8 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.47.0 // indirect
github.com/valyala/fasthttp v1.48.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
Loading

0 comments on commit 68f6651

Please sign in to comment.