Skip to content

Commit

Permalink
allow setting max messages in kafka producer queue
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Oct 25, 2019
1 parent b9fc279 commit 5086b91
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
3 changes: 3 additions & 0 deletions scripts/distro-files/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ topicActivity = "allMyActivity"
topicPrefix = "postsFromServer1"
# optional: forces topic suffix for instance mutations; each entry is data UUID : suffix.
topicSuffixes = ["bc95398cb3ae40fcab2529c7bca1ad0d:myGreatDataInstance"]
# optional: allows setting of max # messages on producer queue (queue.buffering.max.messages)
# default is 100,000 and may be exceeded for activity logs on very busy servers
bufferSize = 1000000

servers = ["foo.bar.com:1234", "foo2.bar.com:1234"]

Expand Down
8 changes: 6 additions & 2 deletions storage/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type KafkaConfig struct {
TopicPrefix string // if supplied, will be prefixed to any mutation logging
TopicSuffixes []string // optional topic suffixes per data UUID
Servers []string
BufferSize int // queue.buffering.max.messages
}

// KafkaTopicSuffix returns any configured suffix for the given data UUID or the empty string.
Expand Down Expand Up @@ -83,11 +84,14 @@ func (kc KafkaConfig) Initialize(hostID string) error {
}
kafkaActivityTopic = reg.ReplaceAllString(kafkaActivityTopic, "-")

configMap := &kafka.ConfigMap{
configMap := kafka.ConfigMap{
"client.id": "dvid-kafkaclient",
"bootstrap.servers": strings.Join(kc.Servers, ","),
}
if kafkaProducer, err = kafka.NewProducer(configMap); err != nil {
if kc.BufferSize != 0 {
configMap["queue.buffering.max.messages"] = kc.BufferSize
}
if kafkaProducer, err = kafka.NewProducer(&configMap); err != nil {
return err
}

Expand Down

0 comments on commit 5086b91

Please sign in to comment.