Skip to content

Commit

Permalink
kafka: manager: List topics before creating them (#316)
Browse files Browse the repository at this point in the history
Updates the manager `CreateTopics` method to only issue the CreateTopics
call if the topics don't exist in Kafka, preventing undesirable errors
from being returned and logged.

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
marclop and axw authored Oct 31, 2023
1 parent 8c6a813 commit f628323
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 98 deletions.
102 changes: 64 additions & 38 deletions kafka/topiccreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,53 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
for i, topic := range topics {
topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
}
responses, err := c.m.adminClient.CreateTopics(
ctx,

existing, err := c.m.adminClient.ListTopics(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to list kafka topics: %w", err)
}

// missingTopics contains topics which need to be created.
missingTopics := make([]string, 0, len(topicNames))
// updatePartitions contains topics which partitions' need to be updated.
updatePartitions := make([]string, 0, len(topicNames))
// existingTopics contains the existing topics, used by AlterTopicConfigs.
existingTopics := make([]string, 0, len(topicNames))
for _, wantTopic := range topicNames {
if !existing.Has(wantTopic) {
missingTopics = append(missingTopics, wantTopic)
continue
}
existingTopics = append(existingTopics, wantTopic)
if len(existing[wantTopic].Partitions) < c.partitionCount {
updatePartitions = append(updatePartitions, wantTopic)
}
}

responses, err := c.m.adminClient.CreateTopics(ctx,
int32(c.partitionCount),
-1, // default.replication.factor
c.topicConfigs,
topicNames...,
missingTopics...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to create kafka topics: %w", err)
}
createTopicParamsFields := []zap.Field{
loggerFields := []zap.Field{
zap.Int("partition_count", c.partitionCount),
}
if len(c.origTopicConfigs) > 0 {
createTopicParamsFields = append(createTopicParamsFields,
loggerFields = append(loggerFields,
zap.Reflect("topic_configs", c.origTopicConfigs),
)
}

existingTopics := make([]string, 0, len(topicNames))
var updateErrors []error
logger := c.m.cfg.Logger.With(createTopicParamsFields...)
logger := c.m.cfg.Logger.With(loggerFields...)
for _, response := range responses.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
if err := response.Err; err != nil {
Expand All @@ -137,7 +160,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
span.AddEvent("kafka topic already exists", trace.WithAttributes(
semconv.MessagingDestinationKey.String(topicName),
))
existingTopics = append(existingTopics, response.Topic)
} else {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -150,13 +172,17 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
logger.Info("created kafka topic", zap.String("topic", topicName))
}

if len(existingTopics) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx, c.partitionCount, existingTopics...)
// Update the topic partitions.
if len(updatePartitions) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx,
c.partitionCount,
updatePartitions...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to update partitions for kafka topics: %v: %w",
existingTopics, err,
updatePartitions, err,
)
}
for _, response := range updateResp.Sorted() {
Expand All @@ -180,37 +206,37 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
zap.String("topic", topicName),
)
}
if len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg,
existingTopics...,
}
if len(existingTopics) > 0 && len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx,
alterCfg, existingTopics...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
existingTopics, err,
)
if err != nil {
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
existingTopics, err,
)
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
}
}
return errors.Join(updateErrors...)
Expand Down
118 changes: 58 additions & 60 deletions kafka/topiccreator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -67,9 +68,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
commonConfig.Logger = zap.New(core)
commonConfig.TracerProvider = tp

m, err := NewManager(ManagerConfig{
CommonConfig: commonConfig,
})
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
require.NoError(t, err)
t.Cleanup(func() { m.Close() })
c, err := m.NewTopicCreator(TopicCreatorConfig{
Expand All @@ -86,7 +85,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
createTopicsRequest = req.(*kmsg.CreateTopicsRequest)
return &kmsg.CreateTopicsResponse{
Version: createTopicsRequest.Version,
Version: req.GetVersion(),
Topics: []kmsg.CreateTopicsResponseTopic{{
Topic: "name_space-topic1",
ErrorCode: kerr.TopicAlreadyExists.Code,
Expand All @@ -110,33 +109,47 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
var createPartitionsRequest *kmsg.CreatePartitionsRequest
cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest)
return &kmsg.CreatePartitionsResponse{
Version: createPartitionsRequest.Version,
Topics: []kmsg.CreatePartitionsResponseTopic{
{Topic: "name_space-topic1"},
{Topic: "name_space-topic4"},
},
}, nil, true
res := kmsg.CreatePartitionsResponse{Version: req.GetVersion()}
for _, t := range createPartitionsRequest.Topics {
res.Topics = append(res.Topics, kmsg.CreatePartitionsResponseTopic{
Topic: t.Topic,
})
}
return &res, nil, true
})

// Since topic 1 and 4 already exist, their configuration is altered.
var alterConfigsRequest *kmsg.IncrementalAlterConfigsRequest
cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest)
return &kmsg.IncrementalAlterConfigsResponse{
Version: alterConfigsRequest.Version,
Resources: []kmsg.IncrementalAlterConfigsResponseResource{
{
ResourceName: "name_space-topic1",
ResourceType: kmsg.ConfigResourceTypeTopic,
},
{
ResourceName: "name_space-topic4",
ResourceType: kmsg.ConfigResourceTypeTopic,
},
},
res := kmsg.IncrementalAlterConfigsResponse{Version: req.GetVersion()}
for _, r := range alterConfigsRequest.Resources {
res.Resources = append(res.Resources, kmsg.IncrementalAlterConfigsResponseResource{
ResourceType: r.ResourceType,
ResourceName: r.ResourceName,
})
}
return &res, nil, true
})

// Allow some time for the ForceMetadataRefresh to run.
<-time.After(10 * time.Millisecond)

// Simulate topic0 already exists in Kafka.
cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.MetadataResponse{
Version: r.GetVersion(),
Topics: []kmsg.MetadataResponseTopic{{
Topic: kmsg.StringPtr("name_space-topic0"),
TopicID: [16]byte{111},
Partitions: []kmsg.MetadataResponseTopicPartition{{
Partition: 0,
}},
}},
}, nil, true
})
err = c.CreateTopics(context.Background(), "topic1", "topic2", "topic3", "topic4")

err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4")
require.Error(t, err)
assert.EqualError(t, err, `failed to create topic "topic2": `+
`INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`,
Expand Down Expand Up @@ -177,6 +190,23 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
}},
}}, createTopicsRequest.Topics)

// Ensure only `topic0` partitions are updated since it already exists.
assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{
{Topic: "name_space-topic0", Count: 123},
}, createPartitionsRequest.Topics)

// Ensure only the existing topic config is updated since it already exists.
assert.Len(t, alterConfigsRequest.Resources, 1)
assert.Equal(t, []kmsg.IncrementalAlterConfigsRequestResource{{
ResourceType: kmsg.ConfigResourceTypeTopic,
ResourceName: "name_space-topic0",
Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{{
Name: "retention.ms",
Value: kmsg.StringPtr("123"),
}},
}}, alterConfigsRequest.Resources)

// Log assertions.
matchingLogs := observedLogs.FilterFieldKey("topic")
for _, ml := range matchingLogs.AllUntimed() {
t.Log(ml.Message)
Expand Down Expand Up @@ -218,52 +248,20 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
zap.String("topic", "topic3"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "updated partitions for kafka topic",
},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic1"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "updated partitions for kafka topic",
},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic4"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "altered configuration for kafka topic",
},
Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic1"),
zap.String("topic", "topic0"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "altered configuration for kafka topic",
},
Entry: zapcore.Entry{LoggerName: "kafka", Message: "altered configuration for kafka topic"},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic4"),
zap.String("topic", "topic0"),
},
}}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool {
var ai, bi int
Expand Down

0 comments on commit f628323

Please sign in to comment.