diff --git a/scripts/distro-files/config-full.toml b/scripts/distro-files/config-full.toml index 7d6c3f4f..1d2b5996 100644 --- a/scripts/distro-files/config-full.toml +++ b/scripts/distro-files/config-full.toml @@ -138,6 +138,9 @@ topicActivity = "allMyActivity" topicPrefix = "postsFromServer1" # optional: forces topic suffix for instance mutations; each entry is data UUID : suffix. topicSuffixes = ["bc95398cb3ae40fcab2529c7bca1ad0d:myGreatDataInstance"] +# optional: allows setting of max # messages on producer queue (queue.buffering.max.messages) +# default is 100,000 and may be exceeded for activity logs on very busy servers +bufferSize = 1000000 servers = ["foo.bar.com:1234", "foo2.bar.com:1234"] diff --git a/storage/kafka.go b/storage/kafka.go index b5c81ed1..40d78226 100644 --- a/storage/kafka.go +++ b/storage/kafka.go @@ -39,6 +39,7 @@ type KafkaConfig struct { TopicPrefix string // if supplied, will be prefixed to any mutation logging TopicSuffixes []string // optional topic suffixes per data UUID Servers []string + BufferSize int // queue.buffering.max.messages } // KafkaTopicSuffix returns any configured suffix for the given data UUID or the empty string. @@ -83,11 +84,14 @@ func (kc KafkaConfig) Initialize(hostID string) error { } kafkaActivityTopic = reg.ReplaceAllString(kafkaActivityTopic, "-") - configMap := &kafka.ConfigMap{ + configMap := kafka.ConfigMap{ "client.id": "dvid-kafkaclient", "bootstrap.servers": strings.Join(kc.Servers, ","), } - if kafkaProducer, err = kafka.NewProducer(configMap); err != nil { + if kc.BufferSize != 0 { + configMap["queue.buffering.max.messages"] = kc.BufferSize + } + if kafkaProducer, err = kafka.NewProducer(&configMap); err != nil { return err }