From 27f8c0e10facc214d7fd62827b29c177f3b542d7 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Mon, 30 Oct 2023 14:13:19 +0800 Subject: [PATCH 1/6] kafka: manager: List topics before creating them Updates the manager `CreateTopics` method to only issue the CreateTopics call if the topics don't exist in Kafka, preventing undesirable errors from being returned and logged. Signed-off-by: Marc Lopez Rubio --- kafka/topiccreator.go | 100 +++++++++++++++++++++++-------------- kafka/topiccreator_test.go | 48 +++++++++++++++--- 2 files changed, 104 insertions(+), 44 deletions(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 017177af..c9f69de2 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -101,30 +101,51 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi for i, topic := range topics { topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic) } - responses, err := c.m.adminClient.CreateTopics( - ctx, + + existing, err := c.m.adminClient.ListTopics(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf("failed to list kafka topics: %w", err) + } + + // Build two lists, one for the topics that haven't been created yet, and + // another one that can be used to update a topic's partitions. + missingTopics := make([]string, 0, len(topicNames)) + updatePartitions := make([]string, 0, len(topicNames)) + for _, wantTopic := range topicNames { + if !existing.Has(wantTopic) { + missingTopics = append(missingTopics, wantTopic) + continue + } + if len(existing[wantTopic].Partitions) < c.partitionCount { + updatePartitions = append(updatePartitions, wantTopic) + } + } + fmt.Printf("%+v\n", missingTopics) + + responses, err := c.m.adminClient.CreateTopics(ctx, int32(c.partitionCount), -1, // default.replication.factor c.topicConfigs, - topicNames..., + missingTopics..., ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to create kafka topics: %w", err) } - createTopicParamsFields := []zap.Field{ + loggerFields := []zap.Field{ zap.Int("partition_count", c.partitionCount), } if len(c.origTopicConfigs) > 0 { - createTopicParamsFields = append(createTopicParamsFields, + loggerFields = append(loggerFields, zap.Reflect("topic_configs", c.origTopicConfigs), ) } - existingTopics := make([]string, 0, len(topicNames)) var updateErrors []error - logger := c.m.cfg.Logger.With(createTopicParamsFields...) + logger := c.m.cfg.Logger.With(loggerFields...) for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) if err := response.Err; err != nil { @@ -137,7 +158,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi span.AddEvent("kafka topic already exists", trace.WithAttributes( semconv.MessagingDestinationKey.String(topicName), )) - existingTopics = append(existingTopics, response.Topic) } else { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -150,13 +170,17 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi logger.Info("created kafka topic", zap.String("topic", topicName)) } - if len(existingTopics) > 0 { - updateResp, err := c.m.adminClient.UpdatePartitions(ctx, c.partitionCount, existingTopics...) + // Update the topic partitions. + if len(updatePartitions) > 0 { + updateResp, err := c.m.adminClient.UpdatePartitions(ctx, + c.partitionCount, + updatePartitions..., + ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to update partitions for kafka topics: %v: %w", - existingTopics, err, + updatePartitions, err, ) } for _, response := range updateResp.Sorted() { @@ -180,37 +204,37 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi zap.String("topic", topicName), ) } - if len(c.topicConfigs) > 0 { - alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) - for k, v := range c.topicConfigs { - alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) - } - alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg, - existingTopics..., + } + if len(c.topicConfigs) > 0 { + alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) + for k, v := range c.topicConfigs { + alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) + } + alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, + alterCfg, topicNames..., + ) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf( + "failed to update configuration for kafka topics: %v:%w", + topicNames, err, ) - if err != nil { + } + for _, response := range alterResp { + topicName := strings.TrimPrefix(response.Name, namespacePrefix) + if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf( - "failed to update configuration for kafka topics: %v:%w", - existingTopics, err, - ) - } - for _, response := range alterResp { - topicName := strings.TrimPrefix(response.Name, namespacePrefix) - if err := response.Err; err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to alter configuration for topic %q: %w", - topicName, err, - )) - continue - } - logger.Info("altered configuration for kafka topic", - zap.String("topic", topicName), - ) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to alter configuration for topic %q: %w", + topicName, err, + )) + continue } + logger.Info("altered configuration for kafka topic", + zap.String("topic", topicName), + ) } } return errors.Join(updateErrors...) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 75c8da6c..d6959727 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -67,9 +67,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { commonConfig.Logger = zap.New(core) commonConfig.TracerProvider = tp - m, err := NewManager(ManagerConfig{ - CommonConfig: commonConfig, - }) + m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) t.Cleanup(func() { m.Close() }) c, err := m.NewTopicCreator(TopicCreatorConfig{ @@ -80,6 +78,19 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }) require.NoError(t, err) + cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) { + return &kmsg.MetadataResponse{ + Version: r.GetVersion(), + Topics: []kmsg.MetadataResponseTopic{{ + Topic: kmsg.StringPtr("name_space-topic0"), + TopicID: [16]byte{111}, + Partitions: []kmsg.MetadataResponseTopicPartition{{ + Partition: 0, + }}, + }}, + }, nil, true + }) + // Simulate a situation where topic1, topic4 exists, topic2 is invalid and // topic3 is successfully created. var createTopicsRequest *kmsg.CreateTopicsRequest @@ -111,8 +122,9 @@ func TestTopicCreatorCreateTopics(t *testing.T) { cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest) return &kmsg.CreatePartitionsResponse{ - Version: createPartitionsRequest.Version, + Version: req.GetVersion(), Topics: []kmsg.CreatePartitionsResponseTopic{ + {Topic: "name_space-topic0"}, {Topic: "name_space-topic1"}, {Topic: "name_space-topic4"}, }, @@ -123,8 +135,12 @@ func TestTopicCreatorCreateTopics(t *testing.T) { cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest) return &kmsg.IncrementalAlterConfigsResponse{ - Version: alterConfigsRequest.Version, + Version: alterConfigsRequest.GetVersion(), Resources: []kmsg.IncrementalAlterConfigsResponseResource{ + { + ResourceName: "name_space-topic0", + ResourceType: kmsg.ConfigResourceTypeTopic, + }, { ResourceName: "name_space-topic1", ResourceType: kmsg.ConfigResourceTypeTopic, @@ -136,7 +152,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }, }, nil, true }) - err = c.CreateTopics(context.Background(), "topic1", "topic2", "topic3", "topic4") + err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+ `INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`, @@ -177,6 +193,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }}, }}, createTopicsRequest.Topics) + assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{ + {Topic: "name_space-topic0", Count: 123}, + }, createPartitionsRequest.Topics) + matchingLogs := observedLogs.FilterFieldKey("topic") for _, ml := range matchingLogs.AllUntimed() { t.Log(ml.Message) @@ -241,6 +261,22 @@ func TestTopicCreatorCreateTopics(t *testing.T) { zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), zap.String("topic", "topic4"), }, + }, { + Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"}, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.Int("partition_count", 123), + zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), + zap.String("topic", "topic0"), + }, + }, { + Entry: zapcore.Entry{LoggerName: "kafka", Message: "altered configuration for kafka topic"}, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.Int("partition_count", 123), + zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), + zap.String("topic", "topic0"), + }, }, { Entry: zapcore.Entry{ Level: zapcore.InfoLevel, From 8acf0af6cbc25df656135df6b34f63a65c1bc98b Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 31 Oct 2023 10:47:15 +0800 Subject: [PATCH 2/6] Only alter existing topics Signed-off-by: Marc Lopez Rubio --- kafka/topiccreator.go | 7 +-- kafka/topiccreator_test.go | 106 +++++++++++-------------------------- 2 files changed, 36 insertions(+), 77 deletions(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index c9f69de2..d2c00479 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -113,16 +113,17 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi // another one that can be used to update a topic's partitions. missingTopics := make([]string, 0, len(topicNames)) updatePartitions := make([]string, 0, len(topicNames)) + existingTopics := make([]string, 0, len(topicNames)) for _, wantTopic := range topicNames { if !existing.Has(wantTopic) { missingTopics = append(missingTopics, wantTopic) continue } + existingTopics = append(existingTopics, wantTopic) if len(existing[wantTopic].Partitions) < c.partitionCount { updatePartitions = append(updatePartitions, wantTopic) } } - fmt.Printf("%+v\n", missingTopics) responses, err := c.m.adminClient.CreateTopics(ctx, int32(c.partitionCount), @@ -211,14 +212,14 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) } alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, - alterCfg, topicNames..., + alterCfg, existingTopics..., ) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return fmt.Errorf( "failed to update configuration for kafka topics: %v:%w", - topicNames, err, + existingTopics, err, ) } for _, response := range alterResp { diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index d6959727..fbc932f0 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -78,6 +78,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }) require.NoError(t, err) + // Simulate topic0 already exists in Kafka. cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) { return &kmsg.MetadataResponse{ Version: r.GetVersion(), @@ -97,7 +98,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createTopicsRequest = req.(*kmsg.CreateTopicsRequest) return &kmsg.CreateTopicsResponse{ - Version: createTopicsRequest.Version, + Version: req.GetVersion(), Topics: []kmsg.CreateTopicsResponseTopic{{ Topic: "name_space-topic1", ErrorCode: kerr.TopicAlreadyExists.Code, @@ -121,37 +122,29 @@ func TestTopicCreatorCreateTopics(t *testing.T) { var createPartitionsRequest *kmsg.CreatePartitionsRequest cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest) - return &kmsg.CreatePartitionsResponse{ - Version: req.GetVersion(), - Topics: []kmsg.CreatePartitionsResponseTopic{ - {Topic: "name_space-topic0"}, - {Topic: "name_space-topic1"}, - {Topic: "name_space-topic4"}, - }, - }, nil, true + res := kmsg.CreatePartitionsResponse{Version: req.GetVersion()} + for _, t := range createPartitionsRequest.Topics { + res.Topics = append(res.Topics, kmsg.CreatePartitionsResponseTopic{ + Topic: t.Topic, + }) + } + return &res, nil, true }) + // Since topic 1 and 4 already exist, their configuration is altered. var alterConfigsRequest *kmsg.IncrementalAlterConfigsRequest cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest) - return &kmsg.IncrementalAlterConfigsResponse{ - Version: alterConfigsRequest.GetVersion(), - Resources: []kmsg.IncrementalAlterConfigsResponseResource{ - { - ResourceName: "name_space-topic0", - ResourceType: kmsg.ConfigResourceTypeTopic, - }, - { - ResourceName: "name_space-topic1", - ResourceType: kmsg.ConfigResourceTypeTopic, - }, - { - ResourceName: "name_space-topic4", - ResourceType: kmsg.ConfigResourceTypeTopic, - }, - }, - }, nil, true + res := kmsg.IncrementalAlterConfigsResponse{Version: req.GetVersion()} + for _, r := range alterConfigsRequest.Resources { + res.Resources = append(res.Resources, kmsg.IncrementalAlterConfigsResponseResource{ + ResourceType: r.ResourceType, + ResourceName: r.ResourceName, + }) + } + return &res, nil, true }) + err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+ @@ -193,10 +186,23 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }}, }}, createTopicsRequest.Topics) + // Ensure only `topic0` partitions are updated since it already exists. assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{ {Topic: "name_space-topic0", Count: 123}, }, createPartitionsRequest.Topics) + // Ensure only the existing topic config is updated since it already exists. + assert.Len(t, alterConfigsRequest.Resources, 1) + assert.Equal(t, []kmsg.IncrementalAlterConfigsRequestResource{{ + ResourceType: 2, + ResourceName: "name_space-topic0", + Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{{ + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }}, + }}, alterConfigsRequest.Resources) + + // Log assertions. matchingLogs := observedLogs.FilterFieldKey("topic") for _, ml := range matchingLogs.AllUntimed() { t.Log(ml.Message) @@ -237,30 +243,6 @@ func TestTopicCreatorCreateTopics(t *testing.T) { zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), zap.String("topic", "topic3"), }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "updated partitions for kafka topic", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic1"), - }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "updated partitions for kafka topic", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic4"), - }, }, { Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"}, Context: []zapcore.Field{ @@ -277,30 +259,6 @@ func TestTopicCreatorCreateTopics(t *testing.T) { zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), zap.String("topic", "topic0"), }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "altered configuration for kafka topic", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic1"), - }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "altered configuration for kafka topic", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic4"), - }, }}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool { var ai, bi int for i, v := range a.Context { From b20954dd34d5430781fc096dbb51d6ad598bc537 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 31 Oct 2023 10:50:12 +0800 Subject: [PATCH 3/6] Update comments Signed-off-by: Marc Lopez Rubio --- kafka/topiccreator.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index d2c00479..48e0ec35 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -109,10 +109,11 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi return fmt.Errorf("failed to list kafka topics: %w", err) } - // Build two lists, one for the topics that haven't been created yet, and - // another one that can be used to update a topic's partitions. + // missingTopics contains topics which need to be created. missingTopics := make([]string, 0, len(topicNames)) + // updatePartitions contains topics which partitions' need to be updated. updatePartitions := make([]string, 0, len(topicNames)) + // existingTopics contains the existing topics, used by AlterTopicConfigs. existingTopics := make([]string, 0, len(topicNames)) for _, wantTopic := range topicNames { if !existing.Has(wantTopic) { From a3a2dc0060f2f986ccb4c467e20c12ffef9262a9 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 31 Oct 2023 10:52:00 +0800 Subject: [PATCH 4/6] Update test Signed-off-by: Marc Lopez Rubio --- kafka/topiccreator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index fbc932f0..1aa3d9e6 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -194,7 +194,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { // Ensure only the existing topic config is updated since it already exists. assert.Len(t, alterConfigsRequest.Resources, 1) assert.Equal(t, []kmsg.IncrementalAlterConfigsRequestResource{{ - ResourceType: 2, + ResourceType: kmsg.ConfigResourceTypeTopic, ResourceName: "name_space-topic0", Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{{ Name: "retention.ms", From 51d9504484c92c0faef8ee9dc0c8267aec8a18ac Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 31 Oct 2023 11:13:37 +0800 Subject: [PATCH 5/6] Update kafka/topiccreator.go Co-authored-by: Andrew Wilkins --- kafka/topiccreator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 48e0ec35..60798f0e 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -207,7 +207,7 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi ) } } - if len(c.topicConfigs) > 0 { + if len(existingTopics) > 0 && len(c.topicConfigs) > 0 { alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) for k, v := range c.topicConfigs { alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) From f489231ed98521d36b4c234b628e7fa2c112c18f Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 31 Oct 2023 13:13:14 +0800 Subject: [PATCH 6/6] allow ForceMetadataRefresh to run before hijack Signed-off-by: Marc Lopez Rubio --- kafka/topiccreator_test.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 1aa3d9e6..a9620c82 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -21,6 +21,7 @@ import ( "context" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -78,20 +79,6 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }) require.NoError(t, err) - // Simulate topic0 already exists in Kafka. - cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) { - return &kmsg.MetadataResponse{ - Version: r.GetVersion(), - Topics: []kmsg.MetadataResponseTopic{{ - Topic: kmsg.StringPtr("name_space-topic0"), - TopicID: [16]byte{111}, - Partitions: []kmsg.MetadataResponseTopicPartition{{ - Partition: 0, - }}, - }}, - }, nil, true - }) - // Simulate a situation where topic1, topic4 exists, topic2 is invalid and // topic3 is successfully created. var createTopicsRequest *kmsg.CreateTopicsRequest @@ -145,6 +132,23 @@ func TestTopicCreatorCreateTopics(t *testing.T) { return &res, nil, true }) + // Allow some time for the ForceMetadataRefresh to run. + <-time.After(10 * time.Millisecond) + + // Simulate topic0 already exists in Kafka. + cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) { + return &kmsg.MetadataResponse{ + Version: r.GetVersion(), + Topics: []kmsg.MetadataResponseTopic{{ + Topic: kmsg.StringPtr("name_space-topic0"), + TopicID: [16]byte{111}, + Partitions: []kmsg.MetadataResponseTopicPartition{{ + Partition: 0, + }}, + }}, + }, nil, true + }) + err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+