Skip to content

Commit

Permalink
bulkerapp: added queue size metrics.
Browse files Browse the repository at this point in the history
bulkerapp: interrupt batch if destination config was changed
  • Loading branch information
absorbb committed Jan 23, 2025
1 parent 5f5d75f commit b3739ed
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
9 changes: 9 additions & 0 deletions bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jitsucom/bulker/jitsubase/safego"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/kafkabase"
"math"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -280,12 +281,20 @@ func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error
bc.Debugf("Consumer should not consume. offsets: %d-%d", lowOffset, highOffset)
return BatchCounters{}, nil
}
metrics.ConsumerQueueSize(bc.topicId, bc.mode, bc.destinationId, bc.tableName).Set(math.Max(float64(highOffset-lowOffset-int64(maxBatchSize)), 0))
bc.Debugf("Starting consuming messages from topic. Messages in topic: ~%d. ", highOffset-lowOffset)
batchNumber := 1
for {
if bc.retired.Load() {
return
}
if bc.destinationId != "" {
currentDst := bc.repository.GetDestination(bc.destinationId)
if currentDst == nil || currentDst.configHash != destination.configHash {
bc.Infof("Destination config has changed. Finishing this batch.")
return
}
}
batchStats, batchState, nextBatch, err2 := bc.processBatch(destination, batchNumber, maxBatchSize, retryBatchSize, highOffset)
if err2 != nil {
if nextBatch {
Expand Down
20 changes: 17 additions & 3 deletions bulkerapp/app/stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"fmt"
"math"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -31,9 +32,9 @@ type StreamConsumerImpl struct {

eventsLogService eventslog.EventsLogService

tableName string

closed chan struct{}
tableName string
currentOffset int64
closed chan struct{}
}

type StreamConsumer interface {
Expand Down Expand Up @@ -184,6 +185,8 @@ func (sc *StreamConsumerImpl) start() {
sc.Infof("Starting stream consumer for topic. Ver: %s", sc.destination.config.UpdatedAt)
safego.RunWithRestart(func() {
var err error
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-sc.closed:
Expand All @@ -196,6 +199,16 @@ func (sc *StreamConsumerImpl) start() {
}
sc.Infof("Closed stream state: %+v", state)
return
case <-ticker.C:
if sc.currentOffset > 0 {
_, highOffset, err := sc.consumer.QueryWatermarkOffsets(sc.topicId, 0, 10_000)
if err != nil {
sc.Errorf("Error querying watermark offsets: %v", err)
metrics.ConsumerErrors(sc.topicId, "stream", sc.destination.Id(), sc.tableName, "query_watermark_failed").Inc()
} else {
metrics.ConsumerQueueSize(sc.topicId, "stream", sc.destination.Id(), sc.tableName).Set(math.Max(float64(highOffset-sc.currentOffset-1), 0))
}
}
default:
var message *kafka.Message
message, err = sc.consumer.ReadMessage(streamConsumerMessageWaitTimeout)
Expand All @@ -214,6 +227,7 @@ func (sc *StreamConsumerImpl) start() {
}
continue
}
sc.currentOffset = int64(message.TopicPartition.Offset)
metricsMeta := kafkabase.GetKafkaHeader(message, MetricsMetaHeader)
metrics.ConsumerMessages(sc.topicId, "stream", sc.destination.Id(), sc.tableName, "consumed").Inc()
var obj types.Object
Expand Down
9 changes: 9 additions & 0 deletions bulkerapp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ var (
return consumerMessages.WithLabelValues(topicId, mode, destinationId, tableName, status)
}

consumerQueueSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "bulkerapp",
Subsystem: "consumer",
Name: "queue_size",
}, []string{"topicId", "mode", "destinationId", "tableName"})
ConsumerQueueSize = func(topicId, mode, destinationId, tableName string) prometheus.Gauge {
return consumerQueueSize.WithLabelValues(topicId, mode, destinationId, tableName)
}

consumerRuns = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "bulkerapp",
Subsystem: "consumer",
Expand Down

0 comments on commit b3739ed

Please sign in to comment.