Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: allow users to set sarama's Metadata parameters(sync from the internal merge request 1442) #33

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 49 additions & 37 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,23 @@ type UserConfig struct {
ScramClient *LSCRAMClient // LSCRAM safety certification
// The maximum number of retries on failure,
// the default is 0: retry all the time <0 means no retry
MaxRetry int
NetMaxOpenRequests int // Maximum number of requests
MaxProcessingTime time.Duration
NetDailTimeout time.Duration
NetReadTimeout time.Duration
NetWriteTimeout time.Duration
GroupSessionTimeout time.Duration
GroupRebalanceTimeout time.Duration
GroupRebalanceRetryMax int
IsolationLevel sarama.IsolationLevel
RetryInterval time.Duration // Retry Interval Works with MaxRetry
ProducerRetry struct {
MaxRetry int
NetMaxOpenRequests int // Maximum number of requests
MaxProcessingTime time.Duration
NetDailTimeout time.Duration
NetReadTimeout time.Duration
NetWriteTimeout time.Duration
GroupSessionTimeout time.Duration
GroupRebalanceTimeout time.Duration
GroupRebalanceRetryMax int
MetadataRetryMax int
MetadataRetryBackoff time.Duration
MetadataRefreshFrequency time.Duration
MetadataFull bool
MetadataAllowAutoTopicCreation bool
IsolationLevel sarama.IsolationLevel
RetryInterval time.Duration // Retry Interval Works with MaxRetry
ProducerRetry struct {
Max int // Maximum number of retries
RetryInterval time.Duration // RetryInterval retry interval
}
Expand All @@ -83,9 +88,11 @@ func (uc *UserConfig) getServerConfig() *sarama.Config {
sc.ClientID = uc.ClientID
}

sc.Metadata.Full = false // Disable pulling all metadata
sc.Metadata.Retry.Max = 1 // Metadata Update Repeat Times
sc.Metadata.Retry.Backoff = time.Second // Metadata update wait time
sc.Metadata.Retry.Max = uc.MetadataRetryMax
sc.Metadata.Retry.Backoff = uc.MetadataRetryBackoff
sc.Metadata.RefreshFrequency = uc.MetadataRefreshFrequency
sc.Metadata.Full = uc.MetadataFull
sc.Metadata.AllowAutoTopicCreation = uc.MetadataAllowAutoTopicCreation

sc.Net.MaxOpenRequests = uc.NetMaxOpenRequests
sc.Net.DialTimeout = uc.NetDailTimeout
Expand Down Expand Up @@ -129,28 +136,33 @@ func GetDefaultConfig() *UserConfig {
// The maximum waiting time for a single consumption pull request.
// The maximum wait time will only wait if there is no recent data.
// This value should be set larger to reduce the consumption of empty requests on the QPS of the server.
MaxWaitTime: time.Second,
RequiredAcks: sarama.WaitForAll,
ReturnSuccesses: true,
Timeout: time.Second, // Maximum request processing time on the server side
MaxMessageBytes: 131072, // CDMQ set up
FlushMessages: 0,
FlushMaxMessages: 0,
FlushBytes: 0,
FlushFrequency: 0,
BatchConsumeCount: 0,
BatchFlush: 2 * time.Second,
ScramClient: nil,
MaxRetry: 0, // Unlimited retries, compatible with historical situations
NetMaxOpenRequests: 5,
MaxProcessingTime: 100 * time.Millisecond,
NetDailTimeout: 30 * time.Second,
NetReadTimeout: 30 * time.Second,
NetWriteTimeout: 30 * time.Second,
GroupSessionTimeout: 10 * time.Second,
GroupRebalanceTimeout: 60 * time.Second,
GroupRebalanceRetryMax: 4,
IsolationLevel: 0,
MaxWaitTime: time.Second,
RequiredAcks: sarama.WaitForAll,
ReturnSuccesses: true,
Timeout: time.Second, // Maximum request processing time on the server side
MaxMessageBytes: 131072, // CDMQ set up
FlushMessages: 0,
FlushMaxMessages: 0,
FlushBytes: 0,
FlushFrequency: 0,
BatchConsumeCount: 0,
BatchFlush: 2 * time.Second,
ScramClient: nil,
MaxRetry: 0, // Unlimited retries, compatible with historical situations
NetMaxOpenRequests: 5,
MaxProcessingTime: 100 * time.Millisecond,
NetDailTimeout: 30 * time.Second,
NetReadTimeout: 30 * time.Second,
NetWriteTimeout: 30 * time.Second,
GroupSessionTimeout: 10 * time.Second,
GroupRebalanceTimeout: 60 * time.Second,
GroupRebalanceRetryMax: 4,
MetadataRetryMax: 1,
MetadataRetryBackoff: 1000 * time.Millisecond,
MetadataRefreshFrequency: 600 * time.Second,
MetadataFull: false, // disable pull all metadata
MetadataAllowAutoTopicCreation: true,
IsolationLevel: 0,
// Message consumption error retry interval The default is 3s The unit of this parameter is time.Millisecond
RetryInterval: 3000 * time.Millisecond,
// production retries the default configuration to align with the default configuration of sarama.NewConfig
Expand Down
47 changes: 47 additions & 0 deletions kafka/config_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func newConfigParsers() map[string]configParseFunc {
parserBasicConfig(m)
parserAdvanceConfig(m)
parserBatchConfig(m)
parserMetadataConfig(m)
parserAuthConfig(m)
parserDiscoverConfig(m)
return m
Expand Down Expand Up @@ -218,6 +219,52 @@ func parserBatchConfig(m map[string]configParseFunc) {
}
}

func parserMetadataConfig(m map[string]configParseFunc) {
m["metadataRetryMax"] = func(config *UserConfig, s string) error {
metadataRetryMax, err := strconv.Atoi(s)
if err != nil {
return err
}
if metadataRetryMax < 0 {
return errors.New("param not support: metadataRetryMax expect a value of no less than 0")
}
config.MetadataRetryMax = metadataRetryMax
return nil
}
m["metadataRetryBackoff"] = func(config *UserConfig, s string) error {
metadataRetryBackoff, err := strconv.Atoi(s)
if err != nil {
return err
}
if metadataRetryBackoff < 0 {
return errors.New("param not support: metadataRetryBackoff expect a value of no less than 0")
}
config.MetadataRetryBackoff = time.Duration(metadataRetryBackoff) * time.Millisecond
return nil
}
m["metadataRefreshFrequency"] = func(config *UserConfig, s string) error {
metadataRefreshFrequency, err := strconv.Atoi(s)
if err != nil {
return err
}
if metadataRefreshFrequency < 0 {
return errors.New("param not support: metadataRefreshFrequency expect a value of no less than 0")
}
config.MetadataRefreshFrequency = time.Duration(metadataRefreshFrequency) * time.Second
return nil
}
m["metadataFull"] = func(config *UserConfig, s string) error {
var err error
config.MetadataFull, err = strconv.ParseBool(s)
return err
}
m["metadataAllowAutoTopicCreation"] = func(config *UserConfig, s string) error {
var err error
config.MetadataAllowAutoTopicCreation, err = strconv.ParseBool(s)
return err
}
}

// parserAuthConfig returns configParseFunc for kafka auth,
// These parameter items can assign the user's input configuration to userConfig.
// The parameter items may validate the content of the user's input during execution,
Expand Down
29 changes: 28 additions & 1 deletion kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func TestParseAddress(t *testing.T) {
assert.Equal(t, 54455, cfg.FetchMax)
assert.Equal(t, 35326236, cfg.MaxMessageBytes)
assert.Equal(t, sarama.OffsetOldest, cfg.Initial)
assert.Equal(t, 10, cfg.FlushMessages)
assert.Equal(t, 100, cfg.FlushMaxMessages)
assert.Equal(t, 10000000, cfg.FlushBytes)
assert.Equal(t, 100*time.Millisecond, cfg.FlushFrequency)
assert.Equal(t, true, cfg.Idempotent)

RegisterAddrConfig("test_registered", cfg)

Expand All @@ -36,8 +41,10 @@ func TestParseAddress(t *testing.T) {
func Test_parseAddress(t *testing.T) {
address := "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&user=kafka_test&password=cccaaabb&mechanism=SCRAM-SHA-512"
_, err := ParseAddress(address)
assert.Nil(t, err)
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&protocol=SASL_SSL&user=kafka_test&password=cccaaabb&mechanism=SCRAM-SHA-512"
_, err = ParseAddress(address)
assert.Nil(t, err)
address = "127.0.0.1:9092?topics=Topic1,Topic2,Topic3&clientid=client1&version=0.10.2.0&strategy=sticky&batch=2&batchFlush=3000&group=test&maxRetry=10"
_, err = ParseAddress(address)
assert.Nil(t, err)
Expand Down Expand Up @@ -72,7 +79,27 @@ func Test_parseAddressWithmaxWaitTime(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 250*time.Millisecond, conf.MaxWaitTime)
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&maxWaitTime=test"
conf, err = ParseAddress(address)
_, err = ParseAddress(address)
assert.NotNil(t, err)
}

func Test_parseAddressWithMetadata(t *testing.T) {
address := "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=10&metadataRefreshFrequency=100&metadataFull=true&metadataAllowAutoTopicCreation=true"
conf, err := ParseAddress(address)
assert.Nil(t, err)
assert.Equal(t, 3, conf.MetadataRetryMax)
assert.Equal(t, 10*time.Millisecond, conf.MetadataRetryBackoff)
assert.Equal(t, 100*time.Second, conf.MetadataRefreshFrequency)
assert.Equal(t, true, conf.MetadataFull)
assert.Equal(t, true, conf.MetadataAllowAutoTopicCreation)
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=-3&metadataRetryBackoff=10&metadataRefreshFrequency=100"
_, err = ParseAddress(address)
assert.NotNil(t, err)
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=-10&metadataRefreshFrequency=100"
_, err = ParseAddress(address)
assert.NotNil(t, err)
address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=10&metadataRefreshFrequency=-100"
_, err = ParseAddress(address)
assert.NotNil(t, err)
}

Expand Down
Loading