Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: manager: List topics before creating them #316

Merged
merged 6 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 64 additions & 38 deletions kafka/topiccreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,30 +101,53 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
for i, topic := range topics {
topicNames[i] = fmt.Sprintf("%s%s", namespacePrefix, topic)
}
responses, err := c.m.adminClient.CreateTopics(
ctx,

existing, err := c.m.adminClient.ListTopics(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to list kafka topics: %w", err)
}

// missingTopics contains topics which need to be created.
missingTopics := make([]string, 0, len(topicNames))
// updatePartitions contains topics which partitions' need to be updated.
updatePartitions := make([]string, 0, len(topicNames))
// existingTopics contains the existing topics, used by AlterTopicConfigs.
existingTopics := make([]string, 0, len(topicNames))
for _, wantTopic := range topicNames {
if !existing.Has(wantTopic) {
missingTopics = append(missingTopics, wantTopic)
continue
}
existingTopics = append(existingTopics, wantTopic)
if len(existing[wantTopic].Partitions) < c.partitionCount {
updatePartitions = append(updatePartitions, wantTopic)
}
}

responses, err := c.m.adminClient.CreateTopics(ctx,
int32(c.partitionCount),
-1, // default.replication.factor
c.topicConfigs,
topicNames...,
missingTopics...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to create kafka topics: %w", err)
}
createTopicParamsFields := []zap.Field{
loggerFields := []zap.Field{
zap.Int("partition_count", c.partitionCount),
}
if len(c.origTopicConfigs) > 0 {
createTopicParamsFields = append(createTopicParamsFields,
loggerFields = append(loggerFields,
zap.Reflect("topic_configs", c.origTopicConfigs),
)
}

existingTopics := make([]string, 0, len(topicNames))
var updateErrors []error
logger := c.m.cfg.Logger.With(createTopicParamsFields...)
logger := c.m.cfg.Logger.With(loggerFields...)
for _, response := range responses.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
if err := response.Err; err != nil {
Expand All @@ -137,7 +160,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
span.AddEvent("kafka topic already exists", trace.WithAttributes(
semconv.MessagingDestinationKey.String(topicName),
))
existingTopics = append(existingTopics, response.Topic)
} else {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -150,13 +172,17 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
logger.Info("created kafka topic", zap.String("topic", topicName))
}

if len(existingTopics) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx, c.partitionCount, existingTopics...)
// Update the topic partitions.
if len(updatePartitions) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx,
c.partitionCount,
updatePartitions...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to update partitions for kafka topics: %v: %w",
existingTopics, err,
updatePartitions, err,
)
}
for _, response := range updateResp.Sorted() {
Expand All @@ -180,37 +206,37 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
zap.String("topic", topicName),
)
}
if len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg,
existingTopics...,
}
if len(existingTopics) > 0 && len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx,
alterCfg, existingTopics...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
existingTopics, err,
)
if err != nil {
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
existingTopics, err,
)
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
}
}
return errors.Join(updateErrors...)
Expand Down
118 changes: 58 additions & 60 deletions kafka/topiccreator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -67,9 +68,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
commonConfig.Logger = zap.New(core)
commonConfig.TracerProvider = tp

m, err := NewManager(ManagerConfig{
CommonConfig: commonConfig,
})
m, err := NewManager(ManagerConfig{CommonConfig: commonConfig})
require.NoError(t, err)
t.Cleanup(func() { m.Close() })
c, err := m.NewTopicCreator(TopicCreatorConfig{
Expand All @@ -86,7 +85,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
createTopicsRequest = req.(*kmsg.CreateTopicsRequest)
return &kmsg.CreateTopicsResponse{
Version: createTopicsRequest.Version,
Version: req.GetVersion(),
Topics: []kmsg.CreateTopicsResponseTopic{{
Topic: "name_space-topic1",
ErrorCode: kerr.TopicAlreadyExists.Code,
Expand All @@ -110,33 +109,47 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
var createPartitionsRequest *kmsg.CreatePartitionsRequest
cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest)
return &kmsg.CreatePartitionsResponse{
Version: createPartitionsRequest.Version,
Topics: []kmsg.CreatePartitionsResponseTopic{
{Topic: "name_space-topic1"},
{Topic: "name_space-topic4"},
},
}, nil, true
res := kmsg.CreatePartitionsResponse{Version: req.GetVersion()}
for _, t := range createPartitionsRequest.Topics {
res.Topics = append(res.Topics, kmsg.CreatePartitionsResponseTopic{
Topic: t.Topic,
})
}
return &res, nil, true
})

// Since topic 1 and 4 already exist, their configuration is altered.
var alterConfigsRequest *kmsg.IncrementalAlterConfigsRequest
cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) {
alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest)
return &kmsg.IncrementalAlterConfigsResponse{
Version: alterConfigsRequest.Version,
Resources: []kmsg.IncrementalAlterConfigsResponseResource{
{
ResourceName: "name_space-topic1",
ResourceType: kmsg.ConfigResourceTypeTopic,
},
{
ResourceName: "name_space-topic4",
ResourceType: kmsg.ConfigResourceTypeTopic,
},
},
res := kmsg.IncrementalAlterConfigsResponse{Version: req.GetVersion()}
for _, r := range alterConfigsRequest.Resources {
res.Resources = append(res.Resources, kmsg.IncrementalAlterConfigsResponseResource{
ResourceType: r.ResourceType,
ResourceName: r.ResourceName,
})
}
return &res, nil, true
})

// Allow some time for the ForceMetadataRefresh to run.
<-time.After(10 * time.Millisecond)

// Simulate topic0 already exists in Kafka.
cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) {
return &kmsg.MetadataResponse{
Version: r.GetVersion(),
Topics: []kmsg.MetadataResponseTopic{{
Topic: kmsg.StringPtr("name_space-topic0"),
TopicID: [16]byte{111},
Partitions: []kmsg.MetadataResponseTopicPartition{{
Partition: 0,
}},
}},
}, nil, true
})
err = c.CreateTopics(context.Background(), "topic1", "topic2", "topic3", "topic4")

err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4")
require.Error(t, err)
assert.EqualError(t, err, `failed to create topic "topic2": `+
`INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`,
Expand Down Expand Up @@ -177,6 +190,23 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
}},
}}, createTopicsRequest.Topics)

// Ensure only `topic0` partitions are updated since it already exists.
assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{
{Topic: "name_space-topic0", Count: 123},
}, createPartitionsRequest.Topics)

// Ensure only the existing topic config is updated since it already exists.
assert.Len(t, alterConfigsRequest.Resources, 1)
assert.Equal(t, []kmsg.IncrementalAlterConfigsRequestResource{{
ResourceType: kmsg.ConfigResourceTypeTopic,
ResourceName: "name_space-topic0",
Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{{
Name: "retention.ms",
Value: kmsg.StringPtr("123"),
}},
}}, alterConfigsRequest.Resources)

// Log assertions.
matchingLogs := observedLogs.FilterFieldKey("topic")
for _, ml := range matchingLogs.AllUntimed() {
t.Log(ml.Message)
Expand Down Expand Up @@ -218,52 +248,20 @@ func TestTopicCreatorCreateTopics(t *testing.T) {
zap.String("topic", "topic3"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "updated partitions for kafka topic",
},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic1"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "updated partitions for kafka topic",
},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic4"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "altered configuration for kafka topic",
},
Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic1"),
zap.String("topic", "topic0"),
},
}, {
Entry: zapcore.Entry{
Level: zapcore.InfoLevel,
LoggerName: "kafka",
Message: "altered configuration for kafka topic",
},
Entry: zapcore.Entry{LoggerName: "kafka", Message: "altered configuration for kafka topic"},
Context: []zapcore.Field{
zap.String("namespace", "name_space"),
zap.Int("partition_count", 123),
zap.Any("topic_configs", map[string]string{"retention.ms": "123"}),
zap.String("topic", "topic4"),
zap.String("topic", "topic0"),
},
}}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool {
var ai, bi int
Expand Down