Skip to content

Commit

Permalink
Merge pull request #125 from mreiferson/jittered_backoff_125
Browse files Browse the repository at this point in the history
consumer: backoff strategies
  • Loading branch information
jehiah committed Mar 22, 2015
2 parents f9995d9 + 5b9ccb8 commit f0c950f
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 19 deletions.
89 changes: 81 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"net"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
"unsafe"
)
Expand All @@ -27,6 +30,61 @@ type defaultsHandler interface {
SetDefaults(c *Config) error
}

// BackoffStrategy defines a strategy for calculating the duration of time
// a consumer should backoff for a given attempt
type BackoffStrategy interface {
Calculate(attempt int) time.Duration
}

// ExponentialStrategy implements an exponential backoff strategy (default)
type ExponentialStrategy struct {
cfg *Config
}

// Calculate returns a duration of time: 2 ^ attempt
func (s *ExponentialStrategy) Calculate(attempt int) time.Duration {
backoffDuration := s.cfg.BackoffMultiplier *
time.Duration(math.Pow(2, float64(attempt)))
if backoffDuration > s.cfg.MaxBackoffDuration {
backoffDuration = s.cfg.MaxBackoffDuration
}
return backoffDuration
}

func (s *ExponentialStrategy) setConfig(cfg *Config) {
s.cfg = cfg
}

// FullJitterStrategy implements http://www.awsarchitectureblog.com/2015/03/backoff.html
type FullJitterStrategy struct {
cfg *Config

rngOnce sync.Once
rng *rand.Rand
}

// Calculate returns a random duration of time [0, 2 ^ attempt]
func (s *FullJitterStrategy) Calculate(attempt int) time.Duration {
// lazily initialize the RNG
s.rngOnce.Do(func() {
if s.rng != nil {
return
}
s.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
})

backoffDuration := s.cfg.BackoffMultiplier *
time.Duration(math.Pow(2, float64(attempt)))
if backoffDuration > s.cfg.MaxBackoffDuration {
backoffDuration = s.cfg.MaxBackoffDuration
}
return time.Duration(s.rng.Intn(int(backoffDuration)))
}

func (s *FullJitterStrategy) setConfig(cfg *Config) {
s.cfg = cfg
}

// Config is a struct of NSQ options
//
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
Expand Down Expand Up @@ -59,11 +117,17 @@ type Config struct {
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`

// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
BackoffStrategy BackoffStrategy
// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

// Amount of time in seconds to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
Expand Down Expand Up @@ -108,9 +172,6 @@ type Config struct {
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`

// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

Expand All @@ -122,9 +183,10 @@ type Config struct {
//
// This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
func NewConfig() *Config {
c := &Config{}
c.configHandlers = append(c.configHandlers, &structTagsConfig{}, &tlsConfig{})
c.initialized = true
c := &Config{
configHandlers: []configHandler{&structTagsConfig{}, &tlsConfig{}},
initialized: true,
}
if err := c.setDefaults(); err != nil {
panic(err.Error())
}
Expand Down Expand Up @@ -170,7 +232,6 @@ func (c *Config) assertInitialized() {
// Validate checks that all values are within specified min/max ranges
func (c *Config) Validate() error {
c.assertInitialized()

for _, h := range c.configHandlers {
if err := h.Validate(c); err != nil {
return err
Expand All @@ -188,7 +249,6 @@ func (c *Config) setDefaults() error {
}
}
}

return nil
}

Expand Down Expand Up @@ -271,6 +331,7 @@ func (h *structTagsConfig) SetDefaults(c *Config) error {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}

c.BackoffStrategy = &ExponentialStrategy{}
c.ClientID = strings.Split(hostname, ".")[0]
c.Hostname = hostname
c.UserAgent = fmt.Sprintf("go-nsq/%s", VERSION)
Expand Down Expand Up @@ -311,6 +372,18 @@ func (h *structTagsConfig) Validate(c *Config) error {
if c.HeartbeatInterval > c.ReadTimeout {
return fmt.Errorf("HeartbeatInterval %v must be less than ReadTimeout %v", c.HeartbeatInterval, c.ReadTimeout)
}

if c.BackoffStrategy == nil {
return fmt.Errorf("BackoffStrategy cannot be nil")
}

// initialize internal backoff strategies that need access to config
if v, ok := c.BackoffStrategy.(interface {
setConfig(*Config)
}); ok {
v.setConfig(c)
}

return nil
}

Expand Down
39 changes: 39 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package nsq

import (
"math/rand"
"net"
"testing"
"time"
)

func TestConfigSet(t *testing.T) {
Expand Down Expand Up @@ -53,3 +55,40 @@ func TestConfigValidate(t *testing.T) {
t.Error("no error set for invalid value")
}
}

func TestExponentialBackoff(t *testing.T) {
expected := []time.Duration{
1 * time.Second,
2 * time.Second,
8 * time.Second,
32 * time.Second,
}
backoffTest(t, expected, func(c *Config) BackoffStrategy {
return &ExponentialStrategy{cfg: c}
})
}

func TestFullJitterBackoff(t *testing.T) {
expected := []time.Duration{
566028617 * time.Nanosecond,
1365407263 * time.Nanosecond,
5232470547 * time.Nanosecond,
21467499218 * time.Nanosecond,
}
backoffTest(t, expected, func(c *Config) BackoffStrategy {
return &FullJitterStrategy{cfg: c, rng: rand.New(rand.NewSource(99))}
})
}

func backoffTest(t *testing.T, expected []time.Duration, cb func(c *Config) BackoffStrategy) {
config := NewConfig()
attempts := []int{0, 1, 3, 5}
s := cb(config)
for i := range attempts {
result := s.Calculate(attempts[i])
if result != expected[i] {
t.Fatalf("wrong backoff duration %v for attempt %d (should be %v)",
result, attempts[i], expected[i])
}
}
}
11 changes: 1 addition & 10 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.backoffDurationForCount(r.backoffCounter)
backoffDuration := r.config.BackoffStrategy.Calculate(int(r.backoffCounter))
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)

Expand Down Expand Up @@ -814,15 +814,6 @@ func (r *Consumer) onConnClose(c *Conn) {
}
}

func (r *Consumer) backoffDurationForCount(count int32) time.Duration {
backoffDuration := r.config.BackoffMultiplier *
time.Duration(math.Pow(2, float64(count)))
if backoffDuration > r.config.MaxBackoffDuration {
backoffDuration = r.config.MaxBackoffDuration
}
return backoffDuration
}

func (r *Consumer) inBackoff() bool {
r.backoffMtx.RLock()
backoffCounter := r.backoffCounter
Expand Down
2 changes: 1 addition & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestConsumerTLSClientCertViaSet(t *testing.T) {

func consumerTest(t *testing.T, cb func(c *Config)) {
config := NewConfig()
laddr := "127.0.0.2"
laddr := "127.0.0.1"
// so that the test can simulate binding consumer to specified address
config.LocalAddr, _ = net.ResolveTCPAddr("tcp", laddr+":0")
// so that the test can simulate reaching max requeues and a call to LogFailedMessage
Expand Down

0 comments on commit f0c950f

Please sign in to comment.