diff --git a/consumergroup/consumer_group.go b/consumergroup/consumer_group.go index db3eb6c..4b01037 100644 --- a/consumergroup/consumer_group.go +++ b/consumergroup/consumer_group.go @@ -82,8 +82,18 @@ type ConsumerGroup struct { offsetManager OffsetManager } -// Connects to a consumer group, using Zookeeper for auto-discovery +type ConsumerFilter func(msg *sarama.ConsumerMessage) bool + func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) { + return joinConsumerGroup(name, topics, zookeeper, nil, config) +} + +func JoinConsumerGroupWithFilter(name string, topics []string, filters map[string]ConsumerFilter, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) { + return joinConsumerGroup(name, topics, zookeeper, filters, config) +} + +// Connects to a consumer group, using Zookeeper for auto-discovery +func joinConsumerGroup(name string, topics []string, zookeeper []string, filters map[string]ConsumerFilter, config *Config) (cg *ConsumerGroup, err error) { if name == "" { return nil, sarama.ConfigurationError("Empty consumergroup name") @@ -176,7 +186,7 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval} cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig) - go cg.topicListConsumer(topics) + go cg.topicListConsumer(topics, filters) return } @@ -250,7 +260,7 @@ func (cg *ConsumerGroup) FlushOffsets() error { return cg.offsetManager.Flush() } -func (cg *ConsumerGroup) topicListConsumer(topics []string) { +func (cg *ConsumerGroup) topicListConsumer(topics []string, filters map[string]ConsumerFilter) { for { select { case <-cg.stopper: @@ -271,7 +281,7 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { for _, topic := range topics { cg.wg.Add(1) - go cg.topicConsumer(topic, cg.messages, cg.errors, stopper) + go cg.topicConsumer(topic, cg.messages, cg.errors, stopper, filters[topic]) } select { @@ -299,7 +309,7 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { } } -func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}) { +func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error, stopper <-chan struct{}, filter ConsumerFilter) { defer cg.wg.Done() select { @@ -342,7 +352,7 @@ func (cg *ConsumerGroup) topicConsumer(topic string, messages chan<- *sarama.Con for _, pid := range myPartitions { wg.Add(1) - go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper) + go cg.partitionConsumer(topic, pid.ID, messages, errors, &wg, stopper, filter) } wg.Wait() @@ -374,7 +384,7 @@ func (cg *ConsumerGroup) consumePartition(topic string, partition int32, nextOff } // Consumes a partition -func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- error, wg *sync.WaitGroup, stopper <-chan struct{}) { +func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messages chan<- *sarama.ConsumerMessage, errors chan<- error, wg *sync.WaitGroup, stopper <-chan struct{}, filter ConsumerFilter) { defer wg.Done() select { @@ -494,6 +504,10 @@ partitionConsumerLoop: } + if filter != nil && !filter(message) { + continue partitionConsumerLoop + } + for { select { case <-stopper: