diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 017177af..60798f0e 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -101,30 +101,53 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for i, topic := range topics { topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic) } - responses, err := c.m.adminClient.CreateTopics( - ctx, + + existing, err := c.m.adminClient.ListTopics(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf("failed to list kafka topics: %w", err) + } + + // missingTopics contains topics which need to be created. + missingTopics := make([]string, 0, len(topicNames)) + // updatePartitions contains topics which partitions' need to be updated. + updatePartitions := make([]string, 0, len(topicNames)) + // existingTopics contains the existing topics, used by AlterTopicConfigs. + existingTopics := make([]string, 0, len(topicNames)) + for _, wantTopic := range topicNames { + if !existing.Has(wantTopic) { + missingTopics = append(missingTopics, wantTopic) + continue + } + existingTopics = append(existingTopics, wantTopic) + if len(existing[wantTopic].Partitions) < c.partitionCount { + updatePartitions = append(updatePartitions, wantTopic) + } + } + + responses, err := c.m.adminClient.CreateTopics(ctx, int32(c.partitionCount), -1, // default.replication.factor c.topicConfigs, - topicNames..., + missingTopics..., ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to create kafka topics: %w", err) } - createTopicParamsFields := []zap.Field{ + loggerFields := []zap.Field{ zap.Int("partition_count", c.partitionCount), } if len(c.origTopicConfigs) > 0 { - createTopicParamsFields = append(createTopicParamsFields, + loggerFields = append(loggerFields, zap.Reflect("topic_configs", c.origTopicConfigs), ) } - existingTopics := make([]string, 0, len(topicNames)) var updateErrors []error - logger := c.m.cfg.Logger.With(createTopicParamsFields...) + logger := c.m.cfg.Logger.With(loggerFields...) for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) if err := response.Err; err != nil { @@ -137,7 +160,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi span.AddEvent("kafka topic already exists", trace.WithAttributes( semconv.MessagingDestinationKey.String(topicName), )) - existingTopics = append(existingTopics, response.Topic) } else { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -150,13 +172,17 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi logger.Info("created kafka topic", zap.String("topic", topicName)) } - if len(existingTopics) > 0 { - updateResp, err := c.m.adminClient.UpdatePartitions(ctx, c.partitionCount, existingTopics...) + // Update the topic partitions. + if len(updatePartitions) > 0 { + updateResp, err := c.m.adminClient.UpdatePartitions(ctx, + c.partitionCount, + updatePartitions..., + ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to update partitions for kafka topics: %v: %w", - existingTopics, err, + updatePartitions, err, ) } for _, response := range updateResp.Sorted() { @@ -180,37 +206,37 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi zap.String("topic", topicName), ) } - if len(c.topicConfigs) > 0 { - alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) - for k, v := range c.topicConfigs { - alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) - } - alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg, - existingTopics..., + } + if len(existingTopics) > 0 && len(c.topicConfigs) > 0 { + alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) + for k, v := range c.topicConfigs { + alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) + } + alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, + alterCfg, existingTopics..., + ) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf( + "failed to update configuration for kafka topics: %v:%w", + existingTopics, err, ) - if err != nil { + } + for _, response := range alterResp { + topicName := strings.TrimPrefix(response.Name, namespacePrefix) + if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf( - "failed to update configuration for kafka topics: %v:%w", - existingTopics, err, - ) - } - for _, response := range alterResp { - topicName := strings.TrimPrefix(response.Name, namespacePrefix) - if err := response.Err; err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to alter configuration for topic %q: %w", - topicName, err, - )) - continue - } - logger.Info("altered configuration for kafka topic", - zap.String("topic", topicName), - ) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to alter configuration for topic %q: %w", + topicName, err, + )) + continue } + logger.Info("altered configuration for kafka topic", + zap.String("topic", topicName), + ) } } return errors.Join(updateErrors...) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 75c8da6c..a9620c82 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -21,6 +21,7 @@ import ( "context" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -67,9 +68,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp - m, err := NewManager(ManagerConfig{ - CommonConfig: commonConfig, - }) + m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) c, err := m.NewTopicCreator(TopicCreatorConfig{ @@ -86,7 +85,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createTopicsRequest = req.(*kmsg.CreateTopicsRequest) return &kmsg.CreateTopicsResponse{ - Version: createTopicsRequest.Version, + Version: req.GetVersion(), Topics: []kmsg.CreateTopicsResponseTopic{{ Topic: "name_space-topic1", ErrorCode: kerr.TopicAlreadyExists.Code, @@ -110,33 +109,47 @@ func TestTopicCreatorCreateTopics(t *testing.T) { var createPartitionsRequest *kmsg.CreatePartitionsRequest cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest) - return &kmsg.CreatePartitionsResponse{ - Version: createPartitionsRequest.Version, - Topics: []kmsg.CreatePartitionsResponseTopic{ - {Topic: "name_space-topic1"}, - {Topic: "name_space-topic4"}, - }, - }, nil, true + res := kmsg.CreatePartitionsResponse{Version: req.GetVersion()} + for _, t := range createPartitionsRequest.Topics { + res.Topics = append(res.Topics, kmsg.CreatePartitionsResponseTopic{ + Topic: t.Topic, + }) + } + return &res, nil, true }) + // Since topic 1 and 4 already exist, their configuration is altered. var alterConfigsRequest *kmsg.IncrementalAlterConfigsRequest cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest) - return &kmsg.IncrementalAlterConfigsResponse{ - Version: alterConfigsRequest.Version, - Resources: []kmsg.IncrementalAlterConfigsResponseResource{ - { - ResourceName: "name_space-topic1", - ResourceType: kmsg.ConfigResourceTypeTopic, - }, - { - ResourceName: "name_space-topic4", - ResourceType: kmsg.ConfigResourceTypeTopic, - }, - }, + res := kmsg.IncrementalAlterConfigsResponse{Version: req.GetVersion()} + for _, r := range alterConfigsRequest.Resources { + res.Resources = append(res.Resources, kmsg.IncrementalAlterConfigsResponseResource{ + ResourceType: r.ResourceType, + ResourceName: r.ResourceName, + }) + } + return &res, nil, true + }) + + // Allow some time for the ForceMetadataRefresh to run. + <-time.After(10 * time.Millisecond) + + // Simulate topic0 already exists in Kafka. + cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) { + return &kmsg.MetadataResponse{ + Version: r.GetVersion(), + Topics: []kmsg.MetadataResponseTopic{{ + Topic: kmsg.StringPtr("name_space-topic0"), + TopicID: [16]byte{111}, + Partitions: []kmsg.MetadataResponseTopicPartition{{ + Partition: 0, + }}, + }}, }, nil, true }) - err = c.CreateTopics(context.Background(), "topic1", "topic2", "topic3", "topic4") + + err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+ `INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`, @@ -177,6 +190,23 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }}, }}, createTopicsRequest.Topics) + // Ensure only `topic0` partitions are updated since it already exists. + assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{ + {Topic: "name_space-topic0", Count: 123}, + }, createPartitionsRequest.Topics) + + // Ensure only the existing topic config is updated since it already exists. + assert.Len(t, alterConfigsRequest.Resources, 1) + assert.Equal(t, []kmsg.IncrementalAlterConfigsRequestResource{{ + ResourceType: kmsg.ConfigResourceTypeTopic, + ResourceName: "name_space-topic0", + Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{{ + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }}, + }}, alterConfigsRequest.Resources) + + // Log assertions. matchingLogs := observedLogs.FilterFieldKey("topic") for _, ml := range matchingLogs.AllUntimed() { t.Log(ml.Message) @@ -218,52 +248,20 @@ func TestTopicCreatorCreateTopics(t *testing.T) { zap.String("topic", "topic3"), }, }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "updated partitions for kafka topic", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic1"), - }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "updated partitions for kafka topic", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic4"), - }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "altered configuration for kafka topic", - }, + Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"}, Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.Int("partition_count", 123), zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic1"), + zap.String("topic", "topic0"), }, }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "altered configuration for kafka topic", - }, + Entry: zapcore.Entry{LoggerName: "kafka", Message: "altered configuration for kafka topic"}, Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.Int("partition_count", 123), zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic4"), + zap.String("topic", "topic0"), }, }}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool { var ai, bi int