Skip to content

Commit

Permalink
Merge branch 'v2' into feature/kafka-header-filter
Browse files Browse the repository at this point in the history
# Conflicts:
#	README.md
#	consumer_base.go
  • Loading branch information
dilaragorum committed Mar 27, 2024
2 parents a43b245 + 474c038 commit 88fe363
Show file tree
Hide file tree
Showing 20 changed files with 459 additions and 99 deletions.
Binary file modified .github/images/grafana.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
96 changes: 49 additions & 47 deletions README.md

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/segmentio/kafka-go"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
Expand Down Expand Up @@ -51,8 +53,8 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
return &c, nil
}

func (b *batchConsumer) GetMetric() *ConsumerMetric {
return b.metric
func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector {
return b.base.GetMetricCollectors()
}

func (b *batchConsumer) Consume() {
Expand Down
50 changes: 27 additions & 23 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,43 @@ import (

const Name = "kafka_konsumer"

type metricCollector struct {
type MetricCollector struct {
consumerMetric *ConsumerMetric

totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *MetricCollector {
if metricPrefix == "" {
metricPrefix = Name
}

return &MetricCollector{
consumerMetric: consumerMetric,

totalProcessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(metricPrefix, "processed_messages_total", "current"),
"Total number of processed messages.",
emptyStringList,
nil,
),
totalUnprocessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(metricPrefix, "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
emptyStringList,
nil,
),
}
}

func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(s, ch)
}

var emptyStringList []string

func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
s.totalProcessedMessagesCounter,
prometheus.CounterValue,
Expand All @@ -37,31 +60,12 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
)
}

func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
return &metricCollector{
consumerMetric: consumerMetric,

totalProcessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_messages_total", "current"),
"Total number of processed messages.",
emptyStringList,
nil,
),
totalUnprocessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
emptyStringList,
nil,
),
}
}

func NewMetricMiddleware(cfg *ConsumerConfig,
app *fiber.App,
consumerMetric *ConsumerMetric,
metricCollectors ...prometheus.Collector,
) (func(ctx *fiber.Ctx) error, error) {
prometheus.DefaultRegisterer.MustRegister(newMetricCollector(consumerMetric))
prometheus.DefaultRegisterer.MustRegister(NewMetricCollector(cfg.MetricPrefix, consumerMetric))
prometheus.DefaultRegisterer.MustRegister(metricCollectors...)

fiberPrometheus := fiberprometheus.New(cfg.Reader.GroupID)
Expand Down
59 changes: 59 additions & 0 deletions collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package kafka

import (
"reflect"
"testing"

"github.com/prometheus/client_golang/prometheus"
)

func Test_NewCollector(t *testing.T) {
t.Run("When_Default_Prefix_Value_Used", func(t *testing.T) {
cronsumerMetric := &ConsumerMetric{}
expectedTotalProcessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_messages_total", "current"),
"Total number of processed messages.",
emptyStringList,
nil,
)
expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
emptyStringList,
nil,
)

collector := NewMetricCollector("", cronsumerMetric)

if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter)
}
if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter)
}
})
t.Run("When_Custom_Prefix_Value_Used", func(t *testing.T) {
cronsumerMetric := &ConsumerMetric{}
expectedTotalProcessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName("custom_prefix", "processed_messages_total", "current"),
"Total number of processed messages.",
emptyStringList,
nil,
)
expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName("custom_prefix", "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
emptyStringList,
nil,
)

collector := NewMetricCollector("custom_prefix", cronsumerMetric)

if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter)
}
if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter)
}
})
}
6 changes: 6 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/segmentio/kafka-go"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
Expand Down Expand Up @@ -46,6 +48,10 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
return &c, nil
}

func (c *consumer) GetMetricCollectors() []prometheus.Collector {
return c.base.GetMetricCollectors()
}

func (c *consumer) Consume() {
go c.subprocesses.Start()

Expand Down
19 changes: 19 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type Consumer interface {
// Resume function resumes consumer, it is start to working
Resume()

// GetMetricCollectors for the purpose of making metric collectors available.
// You can register these collectors on your own http server.
// Please look at the examples/with-metric-collector directory.
GetMetricCollectors() []prometheus.Collector

// WithLogger for injecting custom log implementation
WithLogger(logger LoggerInterface)

Expand Down Expand Up @@ -72,6 +77,7 @@ type base struct {
transactionalRetry bool
distributedTracingEnabled bool
consumerState state
metricPrefix string
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -109,6 +115,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
consumerState: stateRunning,
skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn,
metricPrefix: cfg.MetricPrefix,
}

if cfg.DistributedTracingEnabled {
Expand All @@ -127,6 +134,18 @@ func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Messa
c.subprocesses.Add(c.cronsumer)
}

func (c *base) GetMetricCollectors() []prometheus.Collector {
var metricCollectors []prometheus.Collector

if c.retryEnabled {
metricCollectors = c.cronsumer.GetMetricCollectors()
}

metricCollectors = append(metricCollectors, NewMetricCollector(c.metricPrefix, c.metric))

return metricCollectors
}

func (c *base) setupAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric) {
c.logger.Debug("Initializing API")

Expand Down
19 changes: 17 additions & 2 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,20 @@ type ConsumerConfig struct {
DistributedTracingEnabled bool
RetryEnabled bool
APIEnabled bool

// MetricPrefix is used for prometheus fq name prefix.
// If not provided, default metric prefix value is `kafka_konsumer`.
// Currently, there are two exposed prometheus metrics. `processed_messages_total_current` and `unprocessed_messages_total_current`.
// So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and
// `kafka_konsumer_unprocessed_messages_total_current`.
MetricPrefix string
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
cronsumerCfg := kcronsumer.Config{
ClientID: cfg.RetryConfiguration.ClientID,
Brokers: cfg.RetryConfiguration.Brokers,
MetricPrefix: cfg.RetryConfiguration.MetricPrefix,
ClientID: cfg.RetryConfiguration.ClientID,
Brokers: cfg.RetryConfiguration.Brokers,
Consumer: kcronsumer.ConsumerConfig{
ClientID: cfg.ClientID,
GroupID: cfg.Reader.GroupID,
Expand Down Expand Up @@ -131,6 +139,13 @@ func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header {
}

type RetryConfiguration struct {
// MetricPrefix is used for prometheus fq name prefix.
// If not provided, default metric prefix value is `kafka_cronsumer`.
// Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`.
// So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and
// `kafka_cronsumer_discarded_messages_total_current`.
MetricPrefix string

SASL *SASLConfig
TLS *TLSConfig
ClientID string
Expand Down
1 change: 1 addition & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ services:
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:9092 1 20 && \
kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic another-standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
sleep infinity'"
Expand Down
Loading

0 comments on commit 88fe363

Please sign in to comment.