From 08a850b52c79a9a1b6e457233bd11bf7ba713178 Mon Sep 17 00:00:00 2001 From: Jason Newman Date: Mon, 23 Mar 2015 23:54:27 -0600 Subject: [PATCH] Check updateRDY return error, clear backoffDuration on success Check updateRDY return error, clear backoffDuration on success in backoff, check error status of updateRDY --- consumer.go | 15 ++++++++++++--- errors.go | 3 +++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index dad367b7..9e750aed 100644 --- a/consumer.go +++ b/consumer.go @@ -691,9 +691,9 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) { } func (r *Consumer) backoff() { - atomic.StoreInt64(&r.backoffDuration, 0) if atomic.LoadInt32(&r.stopFlag) == 1 { + atomic.StoreInt64(&r.backoffDuration, 0) return } @@ -713,7 +713,16 @@ func (r *Consumer) backoff() { "(%s) backoff timeout expired, sending RDY 1", choice.String()) // while in backoff only ever let 1 message at a time through - r.updateRDY(choice, 1) + err := r.updateRDY(choice, 1) + if err != nil { + r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err) + backoffDuration := 1 * time.Second + atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) + time.AfterFunc(backoffDuration, r.backoff) + return + } + + atomic.StoreInt64(&r.backoffDuration, 0) } func (r *Consumer) onConnResponse(c *Conn, data []byte) { @@ -865,7 +874,7 @@ exit: func (r *Consumer) updateRDY(c *Conn, count int64) error { if c.IsClosing() { - return nil + return ErrClosing } // never exceed the nsqd's configured max RDY count diff --git a/errors.go b/errors.go index 7e1cd63d..2f228d10 100644 --- a/errors.go +++ b/errors.go @@ -13,6 +13,9 @@ var ErrNotConnected = errors.New("not connected") // made against a Producer that has been stopped var ErrStopped = errors.New("stopped") +// ErrClosing is returned when a connection is closing +var ErrClosing = errors.New("closing") + // ErrAlreadyConnected is returned from ConnectToNSQD when already connected var ErrAlreadyConnected = errors.New("already connected")