Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added periodic client refresh for e2e monitoring. #9

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
version: '2.1'
version: '3.9'

services:

Expand All @@ -12,6 +12,8 @@ services:
ZOOKEEPER_TICK_TIME: 2000
container_name: zookeeper
hostname: zookeeper
networks:
- kminion

kafka:
image: confluentinc/cp-kafka:latest
Expand All @@ -30,6 +32,10 @@ services:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
links:
- zookeeper
networks:
- kminion

kafka-minion:
build:
Expand All @@ -44,4 +50,38 @@ services:
- 8080:8080
environment:
KAFKA_BROKERS: kafka:29092
restart: unless-stopped
restart: unless-stopped
links:
- kafka
networks:
- kminion

grafana:
image: grafana/grafana-oss
ports:
- '3000:3000'
volumes:
- "/tmp/grafana:/var/lib/grafana"
container_name: grafana
hostname: grafana
networks:
- kminion

prometheus:
image: prom/prometheus
ports:
- '9090:9090'
configs:
- source: prometheus
target: /etc/prometheus/prometheus.yml
container_name: prometheus
hostname: prometheus
networks:
- kminion
configs:
prometheus:
file: example/sample_prometheus.yml


networks:
kminion:
12 changes: 7 additions & 5 deletions e2e/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
)

type Config struct {
Enabled bool `koanf:"enabled"`
TopicManagement EndToEndTopicConfig `koanf:"topicManagement"`
ProbeInterval time.Duration `koanf:"probeInterval"`
Producer EndToEndProducerConfig `koanf:"producer"`
Consumer EndToEndConsumerConfig `koanf:"consumer"`
Enabled bool `koanf:"enabled"`
TopicManagement EndToEndTopicConfig `koanf:"topicManagement"`
ProbeInterval time.Duration `koanf:"probeInterval"`
ReconnectInterval time.Duration `koanf:"reconnectInterval"`
Producer EndToEndProducerConfig `koanf:"producer"`
Consumer EndToEndConsumerConfig `koanf:"consumer"`
}

func (c *Config) SetDefaults() {
c.Enabled = false
c.ProbeInterval = 100 * time.Millisecond
c.ReconnectInterval = 0 * time.Second
c.TopicManagement.SetDefaults()
c.Producer.SetDefaults()
c.Consumer.SetDefaults()
Expand Down
20 changes: 17 additions & 3 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,42 @@ import (

func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- bool) {
client := s.client
logger := s.logger.Named("consumer")

s.logger.Info("Starting to consume end-to-end topic",
logger.Info("Starting to consume end-to-end topic",
zap.String("topic_name", s.config.TopicManagement.Name),
zap.String("group_id", s.groupId))

isInitialized := false
for {
if ctx.Err() != nil {
break
}
fetches := client.PollFetches(ctx)
if !isInitialized {
isInitialized = true
initializedCh <- true
}

if fetches == nil {
break
}

logger.Debug("fetching messages", zap.Any("fetches", fetches))
// Log all errors and continue afterwards as we might get errors and still have some fetch results
errors := fetches.Errors()
for _, err := range errors {
s.logger.Error("kafka fetch error",
logger.Error("kafka fetch error",
zap.String("topic", err.Topic),
zap.Int32("partition", err.Partition),
zap.Error(err.Err))
}

fetches.EachRecord(s.processMessage)
}

client.LeaveGroup()
logger.Info("Consumer thread exited")
}

func (s *Service) commitOffsets(ctx context.Context) {
Expand Down Expand Up @@ -75,14 +87,16 @@ func (s *Service) commitOffsets(ctx context.Context) {
// - checks if it is from us, or from another kminion process running somewhere else
// - hands it off to the service, which then reports metrics on it
func (s *Service) processMessage(record *kgo.Record) {
logger := s.logger.Named("consumer")

if record.Value == nil {
// Init messages have nil values - we want to skip these. They are only used to make sure a consumer is ready.
return
}

var msg EndToEndMessage
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
s.logger.Error("failed to unmarshal message value", zap.Error(jerr))
logger.Error("failed to unmarshal message value", zap.Error(jerr))
return // maybe older version
}

Expand Down
3 changes: 3 additions & 0 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (s *Service) produceMessagesToAllPartitions(ctx context.Context) {
// it will add it to the message tracker. If producing fails a message will be logged and the respective metrics
// will be incremented.
func (s *Service) produceMessage(ctx context.Context, partition int) {
logger := s.logger.Named("producer")

topicName := s.config.TopicManagement.Name
record, msg := createEndToEndRecord(s.minionID, topicName, partition)

Expand All @@ -34,6 +36,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {
pID := strconv.Itoa(partition)
s.messagesProducedInFlight.WithLabelValues(pID).Inc()
s.messageTracker.addToTracker(msg)
logger.Debug("producing message", zap.Any("record", record))
s.client.Produce(childCtx, record, func(r *kgo.Record, err error) {
defer cancel()
ackDuration := time.Since(startTime)
Expand Down
107 changes: 81 additions & 26 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ type Service struct {
config Config
logger *zap.Logger

kafkaSvc *kafka.Service // creates kafka client for us
client *kgo.Client
kafkaSvc *kafka.Service // creates kafka client for us
client *kgo.Client
adminClient *kgo.Client
kgoOpts []kgo.Opt

// Service
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
Expand Down Expand Up @@ -57,14 +59,17 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}
kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(3*time.Second))
kgoOpts = append(kgoOpts, kgo.ClientID("kminion"))

// Consumer configs
kgoOpts = append(kgoOpts,
kgo.ConsumerGroup(groupID),
kgo.ConsumeTopics(cfg.TopicManagement.Name),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.DisableAutoCommit(),
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
kgo.InstanceID(groupID),
)

// Prepare hooks
hooks := newEndToEndClientHooks(logger)
Expand All @@ -73,29 +78,17 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
// We use the manual partitioner so that the records' partition id will be used as target partition
kgoOpts = append(kgoOpts, kgo.RecordPartitioner(kgo.ManualPartitioner()))

// Create kafka service and check if client can successfully connect to Kafka cluster
logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata",
zap.String("seed_brokers", strings.Join(kafkaSvc.Brokers(), ",")))
client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err)
}
logger.Info("successfully connected to kafka cluster")

svc := &Service{
config: cfg,
logger: logger.Named("e2e"),
kafkaSvc: kafkaSvc,
client: client,
kgoOpts: kgoOpts,

minionID: minionID,
groupId: groupID,
clientHooks: hooks,
}

svc.groupTracker = newGroupTracker(cfg, logger, client, groupID)
svc.messageTracker = newMessageTracker(svc)

makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec {
cv := prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: "end_to_end",
Expand Down Expand Up @@ -145,23 +138,85 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
return svc, nil
}

// Start starts the service (wow)
func (s *Service) Start(ctx context.Context) error {
func (s *Service) initKafka(ctx context.Context) error {
// Create kafka service and check if client can successfully connect to Kafka cluster
s.logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata",
zap.String("seed_brokers", strings.Join(s.kafkaSvc.Brokers(), ",")))
client, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, s.kgoOpts)
if err != nil {
return fmt.Errorf("failed to create kafka client for e2e: %w", err)
}
s.logger.Info("successfully connected to kafka cluster")

s.client = client
s.groupTracker = newGroupTracker(s.config, s.logger, client, s.groupId)
s.messageTracker = newMessageTracker(s)

return nil
}

func (s *Service) initReconcile(ctx context.Context) error {
s.logger.Info("Starting reconcile")
// Create the KafkaAdmin client used for topic partition and leader reconciliation
adminClient, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, []kgo.Opt{})
if err != nil {
return fmt.Errorf("failed to create kafka client for e2e: %w", err)
}

s.adminClient = adminClient

// Ensure topic exists and is configured correctly
if err := s.validateManagementTopic(ctx); err != nil {
return fmt.Errorf("could not validate end-to-end topic: %w", err)
}

// Get up-to-date metadata and inform our custom partitioner about the partition count
topicMetadata, err := s.getTopicMetadata(ctx)
if err != nil {
return fmt.Errorf("could not get topic metadata after validation: %w", err)
// start topic creation/partition/leader reconciliation loop
go s.startReconciliation(ctx)

return nil
}

// Start starts the service (wow)
func (s *Service) Start(ctx context.Context) error {
if err := s.initReconcile(ctx); err != nil {
return err
}
partitions := len(topicMetadata.Topics[0].Partitions)
s.partitionCount = partitions
if s.config.ReconnectInterval > 0*time.Second {
go s.reconnectLoop(ctx)
} else {
if err := s.run(ctx); err != nil {
return err
}

// finally start everything else (producing, consuming, continuous validation, consumer group tracking)
go s.startReconciliation(ctx)
}
return nil
}

// Stop stops the service
func (s *Service) Stop() {
s.logger.Info("Stopping e2e service")
s.client.Close()
}

func (s *Service) reconnectLoop(pctx context.Context) {
for {
ctx, _ := context.WithTimeout(pctx, s.config.ReconnectInterval)
s.run(ctx)
select {
case <-ctx.Done():
s.Stop()
fmt.Println("Restarting e2e service")
case <-pctx.Done():
s.Stop()
return
}
}
}

func (s *Service) run(ctx context.Context) error {
if err := s.initKafka(ctx); err != nil {
return err
}

// Start consumer and wait until we've received a response for the first poll which would indicate that the
// consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not
Expand Down
Loading