-
Notifications
You must be signed in to change notification settings - Fork 173
/
builders.go
98 lines (81 loc) · 3.84 KB
/
builders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package goka
import (
"hash"
"github.com/IBM/sarama"
)
// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)
// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := globalConfig
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config)
}
// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, config)
}
}
// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)
// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, NewTopicManagerConfig())
}
// TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
return NewTopicManager(brokers, config, tmConfig)
}
}
// TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, tmConfig)
}
}
// ConsumerGroupBuilder creates a `sarama.ConsumerGroup`
type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)
// DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, &config)
}
// ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config
func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder {
return func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, config)
}
}
// SaramaConsumerBuilder creates a `sarama.Consumer`
type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)
// DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumer(brokers, &config)
}
// SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config
func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder {
return func(brokers []string, clientID string) (sarama.Consumer, error) {
config.ClientID = clientID
return sarama.NewConsumer(brokers, config)
}
}
// BackoffBuilder creates a backoff
type BackoffBuilder func() (Backoff, error)
// DefaultBackoffBuilder returnes a simpleBackoff with 10 seconds step increase and 2 minutes max wait
func DefaultBackoffBuilder() (Backoff, error) {
return NewSimpleBackoff(defaultBackoffStep, defaultBackoffMax), nil
}