Skip to content

Commit

Permalink
Check updateRDY return error, clear backoffDuration on success
Browse files Browse the repository at this point in the history
Check updateRDY return error, clear backoffDuration on success

in backoff, check error status of updateRDY
  • Loading branch information
Jason Newman committed Mar 24, 2015
1 parent f0c950f commit 08a850b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
15 changes: 12 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,9 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
}

func (r *Consumer) backoff() {
atomic.StoreInt64(&r.backoffDuration, 0)

if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

Expand All @@ -713,7 +713,16 @@ func (r *Consumer) backoff() {
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
Expand Down Expand Up @@ -865,7 +874,7 @@ exit:

func (r *Consumer) updateRDY(c *Conn, count int64) error {
if c.IsClosing() {
return nil
return ErrClosing
}

// never exceed the nsqd's configured max RDY count
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ var ErrNotConnected = errors.New("not connected")
// made against a Producer that has been stopped
var ErrStopped = errors.New("stopped")

// ErrClosing is returned when a connection is closing
var ErrClosing = errors.New("closing")

// ErrAlreadyConnected is returned from ConnectToNSQD when already connected
var ErrAlreadyConnected = errors.New("already connected")

Expand Down

0 comments on commit 08a850b

Please sign in to comment.