Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed Mar 13, 2024
1 parent 3935fd7 commit c50d119
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 51 deletions.
16 changes: 12 additions & 4 deletions pulsar/internal/backoff.go → pulsar/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff

import (
"math/rand"
Expand All @@ -26,10 +26,14 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// BackoffPolicy parameterize the following options in the reconnection logic to
// Policy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
type BackoffPolicy interface {
type Policy interface {
// Next returns the delay to wait before next retry
Next() time.Duration

// IsMaxBackoffReached evaluates if the max number of retries is reached
IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool
}

// DefaultBackoff computes the delay before retrying an action.
Expand All @@ -38,6 +42,10 @@ type DefaultBackoff struct {
backoff time.Duration
}

func NewDefaultBackoff(backoff time.Duration) Policy {
return &DefaultBackoff{backoff: backoff}
}

const maxBackoff = 60 * time.Second

// Next returns the delay to wait before next retry
Expand All @@ -58,6 +66,6 @@ func (b *DefaultBackoff) Next() time.Duration {
}

// IsMaxBackoffReached evaluates if the max number of retries is reached
func (b *DefaultBackoff) IsMaxBackoffReached() bool {
func (b *DefaultBackoff) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool {
return b.backoff >= maxBackoff
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package internal
package backoff

import (
"testing"
Expand All @@ -35,14 +35,14 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) {
backoff := &DefaultBackoff{}
previousDelay := backoff.Next()
// the last value before capping to the max value is 51.2 s (.1, .2, .4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2)
for previousDelay < 51*time.Second {
for previousDelay < 40*time.Second {
delay := backoff.Next()
// the jitter introduces at most 20% difference so at least delay is 1.6=(1-0.2)*2 bigger
assert.GreaterOrEqual(t, int64(delay), int64(1.6*float64(previousDelay)))
// the jitter introduces at most 20% difference so delay is less than twice the previous value
assert.LessOrEqual(t, int64(float64(delay)*.8), int64(2*float64(previousDelay)))
previousDelay = delay
assert.Equal(t, false, backoff.IsMaxBackoffReached())
assert.Equal(t, false, backoff.IsMaxBackoffReached(delay, time.Second))
}
}

Expand All @@ -55,7 +55,7 @@ func TestBackoff_NextMaxValue(t *testing.T) {

cappedDelay := backoff.Next()
assert.GreaterOrEqual(t, int64(cappedDelay), int64(maxBackoff))
assert.Equal(t, true, backoff.IsMaxBackoffReached())
assert.Equal(t, true, backoff.IsMaxBackoffReached(cappedDelay, time.Second))
// max value is 60 seconds + 20% jitter = 72 seconds
assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
}
4 changes: 2 additions & 2 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

// ConsumerMessage represents a pair of a Consumer and Message.
Expand Down Expand Up @@ -198,7 +198,7 @@ type ConsumerOptions struct {

// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackoffPolicy backoff.Policy

// Decryption represents the encryption related fields required by the consumer to decrypt a message.
Decryption *MessageDecryptionInfo
Expand Down
36 changes: 20 additions & 16 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"google.golang.org/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/crypto"
Expand Down Expand Up @@ -106,7 +108,7 @@ type partitionConsumerOpts struct {
disableForceTopicCreation bool
interceptors ConsumerInterceptors
maxReconnectToBroker *uint
backoffPolicy internal.BackoffPolicy
backoffPolicy backoff.Policy
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
Expand Down Expand Up @@ -574,11 +576,11 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
remainTime := pc.client.operationTimeout
var backoff internal.BackoffPolicy
var bo backoff.Policy
if pc.options.backoffPolicy != nil {
backoff = pc.options.backoffPolicy
bo = pc.options.backoffPolicy
} else {
backoff = &internal.DefaultBackoff{}
bo = &backoff.DefaultBackoff{}
}
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
Expand All @@ -597,7 +599,7 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}
nextDelay := backoff.Next()
nextDelay := bo.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
Expand Down Expand Up @@ -1653,18 +1655,23 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

func (pc *partitionConsumer) reconnectToBroker() {
var maxRetry int
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
)

if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*pc.options.maxReconnectToBroker)
}

var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
var bo backoff.Policy
if pc.options.backoffPolicy != nil {
bo = pc.options.backoffPolicy
} else {
bo = &backoff.DefaultBackoff{}
}

for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
Expand All @@ -1673,11 +1680,8 @@ func (pc *partitionConsumer) reconnectToBroker() {
return
}

if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()
}
delayReconnectTime = bo.Next()
totalDelayReconnectTime += delayReconnectTime

pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
time.Sleep(delayReconnectTime)
Expand Down Expand Up @@ -1707,7 +1711,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}
Expand Down
7 changes: 4 additions & 3 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/log"
)

Expand Down Expand Up @@ -155,7 +156,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
}

// Retry to create producer indefinitely
backoff := &internal.DefaultBackoff{}
bo := &backoff.DefaultBackoff{}
for {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
Expand All @@ -173,7 +174,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {

if err != nil {
r.log.WithError(err).Error("Failed to create DLQ producer")
time.Sleep(backoff.Next())
time.Sleep(bo.Next())
continue
} else {
r.producer = producer
Expand Down
6 changes: 4 additions & 2 deletions pulsar/internal/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"path"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/auth"

"github.com/apache/pulsar-client-go/pulsar/log"
Expand Down Expand Up @@ -148,12 +150,12 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str
if _, ok := err.(*url.Error); ok {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
backoff := DefaultBackoff{100 * time.Millisecond}
bo := backoff.NewDefaultBackoff(100 * time.Millisecond)
startTime := time.Now()
var retryTime time.Duration

for time.Since(startTime) < c.requestTimeout {
retryTime = backoff.Next()
retryTime = bo.Next()
c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
_, err = c.GetWithQueryParams(endpoint, obj, params, true)
Expand Down
6 changes: 4 additions & 2 deletions pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/log"

pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
Expand Down Expand Up @@ -91,7 +93,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_
var host *url.URL
var rpcResult *RPCResult
startTime := time.Now()
backoff := DefaultBackoff{100 * time.Millisecond}
bo := backoff.NewDefaultBackoff(100 * time.Millisecond)
// we can retry these requests because this kind of request is
// not specific to any particular broker
for time.Since(startTime) < c.requestTimeout {
Expand All @@ -106,7 +108,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_
break
}

retryTime := backoff.Next()
retryTime := bo.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

type HashingScheme int
Expand Down Expand Up @@ -173,7 +173,7 @@ type ProducerOptions struct {

// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackoffPolicy backoff.Policy

// BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder)
// This will be used to create batch container when batching is enabled.
Expand Down
27 changes: 16 additions & 11 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto"

Expand Down Expand Up @@ -410,17 +412,22 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer
}

func (p *partitionProducer) reconnectToBroker() {
var maxRetry int
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
)
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
maxRetry = int(*p.options.MaxReconnectToBroker)
}

var (
delayReconnectTime time.Duration
defaultBackoff = internal.DefaultBackoff{}
)
var bo backoff.Policy
if p.options.BackoffPolicy == nil {
bo = p.options.BackoffPolicy
} else {
bo = &backoff.DefaultBackoff{}
}

for maxRetry != 0 {
if p.getProducerState() != producerReady {
Expand All @@ -429,11 +436,9 @@ func (p *partitionProducer) reconnectToBroker() {
return
}

if p.options.BackoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = p.options.BackoffPolicy.Next()
}
delayReconnectTime = bo.Next()
totalDelayReconnectTime += delayReconnectTime

p.log.Info("Reconnecting to broker in ", delayReconnectTime)
time.Sleep(delayReconnectTime)

Expand Down Expand Up @@ -482,7 +487,7 @@ func (p *partitionProducer) reconnectToBroker() {
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
if maxRetry == 0 || bo.IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime) {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/backoff"
)

// ReaderMessage packages Reader and Message as a struct to use.
Expand Down Expand Up @@ -91,7 +91,7 @@ type ReaderOptions struct {

// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
BackoffPolicy backoff.Policy

// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
MaxPendingChunkedMessage int
Expand Down
3 changes: 3 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ func (b *testBackoffPolicy) Next() time.Duration {

return b.curBackoff
}
func (b *testBackoffPolicy) IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool {
return delayReconnectTime >= b.maxBackoff
}

func (b *testBackoffPolicy) IsExpectedIntervalFrom(startTime time.Time) bool {
// Approximately equal to expected interval
Expand Down
Loading

0 comments on commit c50d119

Please sign in to comment.