From 2a2054c1f80d3869ed7de7b5b6c8fc08c54b5624 Mon Sep 17 00:00:00 2001 From: Adrian Coman <1664229+azun@users.noreply.github.com> Date: Mon, 25 Sep 2023 23:51:12 +0300 Subject: [PATCH 1/2] Added periodic client refresh for e2e monitoring. Fixed consumer group monitoring flag Fixed topic management reconciliation --- docker-compose.yml | 44 +++++++- e2e/config.go | 12 +- e2e/consumer.go | 20 +++- e2e/producer.go | 3 + e2e/service.go | 106 +++++++++++++----- e2e/topic.go | 130 +++++++++++++++++----- prometheus/collect_consumer_group_lags.go | 3 + 7 files changed, 255 insertions(+), 63 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index aae9b32..6c149b7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,5 @@ --- -version: '2.1' +version: '3.9' services: @@ -12,6 +12,8 @@ services: ZOOKEEPER_TICK_TIME: 2000 container_name: zookeeper hostname: zookeeper + networks: + - kminion kafka: image: confluentinc/cp-kafka:latest @@ -30,6 +32,10 @@ services: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + links: + - zookeeper + networks: + - kminion kafka-minion: build: @@ -44,4 +50,38 @@ services: - 8080:8080 environment: KAFKA_BROKERS: kafka:29092 - restart: unless-stopped \ No newline at end of file + restart: unless-stopped + links: + - kafka + networks: + - kminion + + grafana: + image: grafana/grafana-oss + ports: + - '3000:3000' + volumes: + - "/tmp/grafana:/var/lib/grafana" + container_name: grafana + hostname: grafana + networks: + - kminion + + prometheus: + image: prom/prometheus + ports: + - '9090:9090' + configs: + - source: prometheus + target: /etc/prometheus/prometheus.yml + container_name: prometheus + hostname: prometheus + networks: + - kminion +configs: + prometheus: + file: example/sample_prometheus.yml + + +networks: + kminion: diff --git a/e2e/config.go b/e2e/config.go index 9c54419..e644e17 100644 --- a/e2e/config.go +++ b/e2e/config.go @@ -6,16 +6,18 @@ import ( ) type Config struct { - Enabled bool `koanf:"enabled"` - TopicManagement EndToEndTopicConfig `koanf:"topicManagement"` - ProbeInterval time.Duration `koanf:"probeInterval"` - Producer EndToEndProducerConfig `koanf:"producer"` - Consumer EndToEndConsumerConfig `koanf:"consumer"` + Enabled bool `koanf:"enabled"` + TopicManagement EndToEndTopicConfig `koanf:"topicManagement"` + ProbeInterval time.Duration `koanf:"probeInterval"` + ReconnectInterval time.Duration `koanf:"reconnectInterval"` + Producer EndToEndProducerConfig `koanf:"producer"` + Consumer EndToEndConsumerConfig `koanf:"consumer"` } func (c *Config) SetDefaults() { c.Enabled = false c.ProbeInterval = 100 * time.Millisecond + c.ReconnectInterval = 0 * time.Second c.TopicManagement.SetDefaults() c.Producer.SetDefaults() c.Consumer.SetDefaults() diff --git a/e2e/consumer.go b/e2e/consumer.go index 865ca8a..75222f6 100644 --- a/e2e/consumer.go +++ b/e2e/consumer.go @@ -13,23 +13,32 @@ import ( func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- bool) { client := s.client + logger := s.logger.Named("consumer") - s.logger.Info("Starting to consume end-to-end topic", + logger.Info("Starting to consume end-to-end topic", zap.String("topic_name", s.config.TopicManagement.Name), zap.String("group_id", s.groupId)) isInitialized := false for { + if ctx.Err() != nil { + break + } fetches := client.PollFetches(ctx) if !isInitialized { isInitialized = true initializedCh <- true } + if fetches == nil { + break + } + + logger.Debug("fetching messages", zap.Any("fetches", fetches)) // Log all errors and continue afterwards as we might get errors and still have some fetch results errors := fetches.Errors() for _, err := range errors { - s.logger.Error("kafka fetch error", + logger.Error("kafka fetch error", zap.String("topic", err.Topic), zap.Int32("partition", err.Partition), zap.Error(err.Err)) @@ -37,6 +46,9 @@ func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- fetches.EachRecord(s.processMessage) } + + client.LeaveGroup() + logger.Info("Consumer thread exited") } func (s *Service) commitOffsets(ctx context.Context) { @@ -75,6 +87,8 @@ func (s *Service) commitOffsets(ctx context.Context) { // - checks if it is from us, or from another kminion process running somewhere else // - hands it off to the service, which then reports metrics on it func (s *Service) processMessage(record *kgo.Record) { + logger := s.logger.Named("consumer") + if record.Value == nil { // Init messages have nil values - we want to skip these. They are only used to make sure a consumer is ready. return @@ -82,7 +96,7 @@ func (s *Service) processMessage(record *kgo.Record) { var msg EndToEndMessage if jerr := json.Unmarshal(record.Value, &msg); jerr != nil { - s.logger.Error("failed to unmarshal message value", zap.Error(jerr)) + logger.Error("failed to unmarshal message value", zap.Error(jerr)) return // maybe older version } diff --git a/e2e/producer.go b/e2e/producer.go index 36a2495..9a09775 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -22,6 +22,8 @@ func (s *Service) produceMessagesToAllPartitions(ctx context.Context) { // it will add it to the message tracker. If producing fails a message will be logged and the respective metrics // will be incremented. func (s *Service) produceMessage(ctx context.Context, partition int) { + logger := s.logger.Named("producer") + topicName := s.config.TopicManagement.Name record, msg := createEndToEndRecord(s.minionID, topicName, partition) @@ -34,6 +36,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { pID := strconv.Itoa(partition) s.messagesProducedInFlight.WithLabelValues(pID).Inc() s.messageTracker.addToTracker(msg) + logger.Debug("producing message", zap.Any("record", record)) s.client.Produce(childCtx, record, func(r *kgo.Record, err error) { defer cancel() ackDuration := time.Since(startTime) diff --git a/e2e/service.go b/e2e/service.go index 6f53070..1a09309 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -18,8 +18,10 @@ type Service struct { config Config logger *zap.Logger - kafkaSvc *kafka.Service // creates kafka client for us - client *kgo.Client + kafkaSvc *kafka.Service // creates kafka client for us + client *kgo.Client + adminClient *kgo.Client + kgoOpts []kgo.Opt // Service minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time @@ -57,6 +59,7 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite()) } kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(3*time.Second)) + kgoOpts = append(kgoOpts, kgo.ClientID("kminion")) // Consumer configs kgoOpts = append(kgoOpts, @@ -64,7 +67,9 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k kgo.ConsumeTopics(cfg.TopicManagement.Name), kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.DisableAutoCommit(), - kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd())) + kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), + kgo.InstanceID(groupID), + ) // Prepare hooks hooks := newEndToEndClientHooks(logger) @@ -73,29 +78,17 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k // We use the manual partitioner so that the records' partition id will be used as target partition kgoOpts = append(kgoOpts, kgo.RecordPartitioner(kgo.ManualPartitioner())) - // Create kafka service and check if client can successfully connect to Kafka cluster - logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", - zap.String("seed_brokers", strings.Join(kafkaSvc.Brokers(), ","))) - client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts) - if err != nil { - return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err) - } - logger.Info("successfully connected to kafka cluster") - svc := &Service{ config: cfg, logger: logger.Named("e2e"), kafkaSvc: kafkaSvc, - client: client, + kgoOpts: kgoOpts, minionID: minionID, groupId: groupID, clientHooks: hooks, } - svc.groupTracker = newGroupTracker(cfg, logger, client, groupID) - svc.messageTracker = newMessageTracker(svc) - makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec { cv := prometheus.NewCounterVec(prometheus.CounterOpts{ Subsystem: "end_to_end", @@ -145,24 +138,85 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k return svc, nil } -// Start starts the service (wow) -func (s *Service) Start(ctx context.Context) error { - // Ensure topic exists and is configured correctly - if err := s.validateManagementTopic(ctx); err != nil { - return fmt.Errorf("could not validate end-to-end topic: %w", err) +func (s *Service) initKafka(ctx context.Context) error { + // Create kafka service and check if client can successfully connect to Kafka cluster + s.logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", + zap.String("seed_brokers", strings.Join(s.kafkaSvc.Brokers(), ","))) + client, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, s.kgoOpts) + if err != nil { + return fmt.Errorf("failed to create kafka client for e2e: %w", err) } + s.logger.Info("successfully connected to kafka cluster") - // Get up-to-date metadata and inform our custom partitioner about the partition count - topicMetadata, err := s.getTopicMetadata(ctx) + s.client = client + s.groupTracker = newGroupTracker(s.config, s.logger, client, s.groupId) + s.messageTracker = newMessageTracker(s) + + return nil +} + +func (s *Service) initReconcile(ctx context.Context) error { + s.logger.Info("Starting reconcile") + adminClient, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, []kgo.Opt{}) if err != nil { - return fmt.Errorf("could not get topic metadata after validation: %w", err) + return fmt.Errorf("failed to create kafka client for e2e: %w", err) + } + + s.adminClient = adminClient + + // Ensure topic exists and is configured correctly + if err := s.validateManagementTopic(ctx); err != nil { + return fmt.Errorf("could not validate end-to-end topic: %w", err) } - partitions := len(topicMetadata.Topics[0].Partitions) - s.partitionCount = partitions // finally start everything else (producing, consuming, continuous validation, consumer group tracking) go s.startReconciliation(ctx) + return nil +} + +// Start starts the service (wow) +func (s *Service) Start(ctx context.Context) error { + if err := s.initReconcile(ctx); err != nil { + return err + } + if s.config.ReconnectInterval > 0*time.Second { + go s.reconnectLoop(ctx) + } else { + if err := s.run(ctx); err != nil { + return err + } + + } + return nil +} + +// Stop stops the service +func (s *Service) Stop() { + s.logger.Info("Stopping e2e service") + s.client.Close() +} + +func (s *Service) reconnectLoop(pctx context.Context) { + for { + ctx, _ := context.WithTimeout(pctx, s.config.ReconnectInterval) + s.run(ctx) + select { + case <-ctx.Done(): + s.Stop() + fmt.Println("Restarting e2e service") + case <-pctx.Done(): + s.Stop() + return + } + } +} + +func (s *Service) run(ctx context.Context) error { + if err := s.initKafka(ctx); err != nil { + return err + } + // Start consumer and wait until we've received a response for the first poll which would indicate that the // consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not // miss messages because the consumer wasn't ready. diff --git a/e2e/topic.go b/e2e/topic.go index d54291a..f5958ac 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -14,13 +14,15 @@ import ( // Check our end-to-end test topic and adapt accordingly if something does not match our expectations. // - does it exist? // - is it configured correctly? -// - does it have enough partitions? -// - is the replicationFactor correct? +// - does it have enough partitions? +// - is the replicationFactor correct? +// // - are assignments good? -// - is each broker leading at least one partition? -// - are replicas distributed correctly? +// - is each broker leading at least one partition? +// - are replicas distributed correctly? func (s *Service) validateManagementTopic(ctx context.Context) error { - s.logger.Debug("validating end-to-end topic...") + logger := s.logger.Named("ManagementTopic") + logger.Info("validating end-to-end topic...") meta, err := s.getTopicMetadata(ctx) if err != nil { @@ -53,7 +55,17 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { } } - alterReq, createReq, err := s.calculatePartitionReassignments(meta) + if !s.config.TopicManagement.Enabled { + topicMetadata, err := s.getTopicMetadata(ctx) + if err != nil { + return fmt.Errorf("could not get topic metadata after validation: %w", err) + } + partitions := len(topicMetadata.Topics[0].Partitions) + s.partitionCount = partitions + return nil + } + + alterReq, createReq, pleReq, err := s.calculatePartitionReassignments(meta) if err != nil { return fmt.Errorf("failed to calculate partition reassignments: %w", err) } @@ -68,6 +80,42 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { return fmt.Errorf("failed to create partitions: %w", err) } + err = s.executeLeaderElection(ctx, pleReq) + if err != nil { + return fmt.Errorf("failed to elect partitions: %w", err) + } + + topicMetadata, err := s.getTopicMetadata(ctx) + if err != nil { + return fmt.Errorf("could not get topic metadata after validation: %w", err) + } + partitions := len(topicMetadata.Topics[0].Partitions) + s.partitionCount = partitions + + logger.Info("end-to-end topic is valid.") + + return nil +} + +func (s *Service) executeLeaderElection(ctx context.Context, req *kmsg.ElectLeadersRequest) error { + if req == nil { + return nil + } + + res, err := req.RequestWith(ctx, s.adminClient) + if err != nil { + return err + } + + for _, topic := range res.Topics { + for _, partition := range topic.Partitions { + typedErr := kerr.TypedErrorForCode(partition.ErrorCode) + if typedErr != nil { + return fmt.Errorf("inner Kafka error: %w", typedErr) + } + } + } + return nil } @@ -76,7 +124,7 @@ func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreateP return nil } - res, err := req.RequestWith(ctx, s.client) + res, err := req.RequestWith(ctx, s.adminClient) if err != nil { return err } @@ -84,7 +132,7 @@ func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreateP for _, topic := range res.Topics { typedErr := kerr.TypedErrorForCode(topic.ErrorCode) if typedErr != nil { - return fmt.Errorf("inner Kafka error: %w", err) + return fmt.Errorf("inner Kafka error: %w", typedErr) } } @@ -96,20 +144,20 @@ func (s *Service) executeAlterPartitionAssignments(ctx context.Context, req *kms return nil } - res, err := req.RequestWith(ctx, s.client) + res, err := req.RequestWith(ctx, s.adminClient) if err != nil { return err } typedErr := kerr.TypedErrorForCode(res.ErrorCode) if typedErr != nil { - return fmt.Errorf("inner Kafka error: %w", err) + return fmt.Errorf("inner Kafka error: %w", typedErr) } for _, topic := range res.Topics { for _, partition := range topic.Partitions { typedErr = kerr.TypedErrorForCode(partition.ErrorCode) if typedErr != nil { - return fmt.Errorf("inner Kafka partition error on partition '%v': %w", partition.Partition, err) + return fmt.Errorf("inner Kafka partition error on partition '%v': %w", partition.Partition, typedErr) } } } @@ -117,19 +165,28 @@ func (s *Service) executeAlterPartitionAssignments(ctx context.Context, req *kms return nil } -func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) (*kmsg.AlterPartitionAssignmentsRequest, *kmsg.CreatePartitionsRequest, error) { +func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) (*kmsg.AlterPartitionAssignmentsRequest, *kmsg.CreatePartitionsRequest, *kmsg.ElectLeadersRequest, error) { brokerByID := brokerMetadataByBrokerID(meta.Brokers) topicMeta := meta.Topics[0] desiredReplicationFactor := s.config.TopicManagement.ReplicationFactor + desiredPartitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker if desiredReplicationFactor > len(brokerByID) { - return nil, nil, fmt.Errorf("the desired replication factor of '%v' is larger than the available brokers "+ + return nil, nil, nil, fmt.Errorf("the desired replication factor of '%v' is larger than the available brokers "+ "('%v' brokers)", desiredReplicationFactor, len(brokerByID)) } + partitionLeaderElections := []int32{} + for _, partition := range topicMeta.Partitions { + if partition.Replicas[0] != partition.Leader { + partitionLeaderElections = append(partitionLeaderElections, partition.Partition) + } + } + // We want to ensure that each brokerID leads at least one partition permanently. Hence let's iterate over brokers. preferredLeaderPartitionsBrokerID := make(map[int32][]kmsg.MetadataResponseTopicPartition) for _, broker := range brokerByID { + preferredLeaderPartitionsBrokerID[broker.NodeID] = []kmsg.MetadataResponseTopicPartition{} for _, partition := range topicMeta.Partitions { // PreferredLeader = BrokerID of the brokerID that is the desired leader. Regardless who the current leader is preferredLeader := partition.Replicas[0] @@ -139,11 +196,11 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( } } - // Partitions that use the same brokerID more than once as preferred leader can be reassigned to other brokers + // Partitions that use the same brokerID more than desiredPartitionsPerBroker as preferred leader can be reassigned to other brokers // We collect them to avoid creating new partitions when not needed. reassignablePartitions := make([]kmsg.MetadataResponseTopicPartition, 0) for _, partitions := range preferredLeaderPartitionsBrokerID { - if len(partitions) > 1 { + if len(partitions) > desiredPartitionsPerBroker { reassignablePartitions = append(reassignablePartitions, partitions[1:]...) continue } @@ -155,7 +212,6 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( partitionCount := len(topicMeta.Partitions) partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, 0) createPartitionAssignments := make([]kmsg.CreatePartitionsRequestTopicAssignment, 0) - for brokerID, partitions := range preferredLeaderPartitionsBrokerID { // Add replicas if number of replicas is smaller than desiredReplicationFactor for _, partition := range partitions { @@ -168,7 +224,7 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( } // TODO: Consider more than one partition per broker config - if len(partitions) != 0 { + if len(partitions) >= desiredPartitionsPerBroker { continue } @@ -183,11 +239,13 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( reassignablePartitions = reassignablePartitions[1:] } - // Create a new partition for this broker - partitionCount++ - assignmentReq := kmsg.NewCreatePartitionsRequestTopicAssignment() - assignmentReq.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID]) - createPartitionAssignments = append(createPartitionAssignments, assignmentReq) + // Create new partitions for this broker + for i := 0; i < desiredPartitionsPerBroker-len(partitions); i++ { + partitionCount++ + assignmentReq := kmsg.NewCreatePartitionsRequestTopicAssignment() + assignmentReq.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID]) + createPartitionAssignments = append(createPartitionAssignments, assignmentReq) + } } var reassignmentReq *kmsg.AlterPartitionAssignmentsRequest @@ -195,7 +253,7 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( s.logger.Info("e2e probe topic has to be modified due to missing replicas or wrong preferred leader assignments", zap.Int("partition_count", len(topicMeta.Partitions)), zap.Int("broker_count", len(meta.Brokers)), - zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker), + zap.Int("config_partitions_per_broker", desiredPartitionsPerBroker), zap.Int("config_replication_factor", s.config.TopicManagement.ReplicationFactor), zap.Int("partitions_to_reassign", len(partitionReassignments)), ) @@ -215,6 +273,7 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( zap.Int("broker_count", len(meta.Brokers)), zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker), zap.Int("partitions_to_add", len(createPartitionAssignments)), + zap.Any("partitions", createPartitionAssignments), ) r := kmsg.NewCreatePartitionsRequest() createPartitionsTopicReq := kmsg.NewCreatePartitionsRequestTopic() @@ -225,7 +284,24 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( createReq = &r } - return reassignmentReq, createReq, nil + var pleReq *kmsg.ElectLeadersRequest + if len(partitionLeaderElections) > 0 { + s.logger.Info("e2e probe topic leaders are not preferred", + zap.Int("partitions_to_elect", len(partitionLeaderElections)), + ) + r := kmsg.NewElectLeadersRequest() + electLeadersRequestTopic := kmsg.NewElectLeadersRequestTopic() + electLeadersRequestTopic.Topic = s.config.TopicManagement.Name + electLeadersRequestTopic.Partitions = partitionLeaderElections + r.Topics = []kmsg.ElectLeadersRequestTopic{electLeadersRequestTopic} + r.ElectionType = 0 // preferred + pleReq = &r + } + + // update partition count for e2e test + s.partitionCount = partitionCount + + return reassignmentReq, createReq, pleReq, nil } // calculateAppropriateReplicas returns the best possible brokerIDs that shall be used as replicas. @@ -299,7 +375,7 @@ func (s *Service) createManagementTopic(ctx context.Context, allMeta *kmsg.Metad req := kmsg.NewCreateTopicsRequest() req.Topics = []kmsg.CreateTopicsRequestTopic{topic} - res, err := req.RequestWith(ctx, s.client) + res, err := req.RequestWith(ctx, s.adminClient) if err != nil { return fmt.Errorf("failed to create e2e topic: %w", err) } @@ -320,7 +396,7 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, req := kmsg.NewMetadataRequest() req.Topics = []kmsg.MetadataRequestTopic{topicReq} - return req.RequestWith(ctx, s.client) + return req.RequestWith(ctx, s.adminClient) } func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (*kmsg.DescribeConfigsResponse, error) { @@ -335,7 +411,7 @@ func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (* }, } - return req.RequestWith(ctx, s.client) + return req.RequestWith(ctx, s.adminClient) } func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig { diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index c8686df..5e0c71e 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -20,6 +20,9 @@ type waterMark struct { } func (e *Exporter) collectConsumerGroupLags(ctx context.Context, ch chan<- prometheus.Metric) bool { + if !e.minionSvc.Cfg.ConsumerGroups.Enabled { + return true + } // Low Watermarks (at the moment they are not needed at all, they could be used to calculate the lag on partitions // that don't have any active offsets) lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2) From ee72349a7a3e9eefe26153758834ba932d92f529 Mon Sep 17 00:00:00 2001 From: Adrian Coman <1664229+azun@users.noreply.github.com> Date: Tue, 10 Oct 2023 16:31:33 +0300 Subject: [PATCH 2/2] Comments and refactored topic management logic --- e2e/service.go | 3 ++- e2e/topic.go | 26 +++++++------------------- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/e2e/service.go b/e2e/service.go index 1a09309..79082f6 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -157,6 +157,7 @@ func (s *Service) initKafka(ctx context.Context) error { func (s *Service) initReconcile(ctx context.Context) error { s.logger.Info("Starting reconcile") + // Create the KafkaAdmin client used for topic partition and leader reconciliation adminClient, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, []kgo.Opt{}) if err != nil { return fmt.Errorf("failed to create kafka client for e2e: %w", err) @@ -169,7 +170,7 @@ func (s *Service) initReconcile(ctx context.Context) error { return fmt.Errorf("could not validate end-to-end topic: %w", err) } - // finally start everything else (producing, consuming, continuous validation, consumer group tracking) + // start topic creation/partition/leader reconciliation loop go s.startReconciliation(ctx) return nil diff --git a/e2e/topic.go b/e2e/topic.go index f5958ac..0e06f31 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -13,6 +13,8 @@ import ( // Check our end-to-end test topic and adapt accordingly if something does not match our expectations. // - does it exist? +// - does configuration allow topic management? +// // - is it configured correctly? // - does it have enough partitions? // - is the replicationFactor correct? @@ -37,6 +39,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { case kerr.UnknownTopicOrPartition: // UnknownTopicOrPartition (Error code 3) means that the topic does not exist. // When the topic doesn't exist, continue to create it further down in the code. + if !s.config.TopicManagement.Enabled { + return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " + + "because topic management is disabled") + } topicExists = false default: // If the topic (possibly) exists, but there's an error, then this should result in a fail @@ -45,24 +51,13 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { // Create topic if it doesn't exist if !topicExists { - if !s.config.TopicManagement.Enabled { - return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " + - "because topic management is disabled") - } - if err = s.createManagementTopic(ctx, meta); err != nil { return err } - } - - if !s.config.TopicManagement.Enabled { - topicMetadata, err := s.getTopicMetadata(ctx) + meta, err = s.getTopicMetadata(ctx) if err != nil { return fmt.Errorf("could not get topic metadata after validation: %w", err) } - partitions := len(topicMetadata.Topics[0].Partitions) - s.partitionCount = partitions - return nil } alterReq, createReq, pleReq, err := s.calculatePartitionReassignments(meta) @@ -85,13 +80,6 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { return fmt.Errorf("failed to elect partitions: %w", err) } - topicMetadata, err := s.getTopicMetadata(ctx) - if err != nil { - return fmt.Errorf("could not get topic metadata after validation: %w", err) - } - partitions := len(topicMetadata.Topics[0].Partitions) - s.partitionCount = partitions - logger.Info("end-to-end topic is valid.") return nil