Skip to content

Commit

Permalink
feature:自适应重试控制策略
Browse files Browse the repository at this point in the history
  • Loading branch information
Stone-afk committed Dec 22, 2024
1 parent 5cbb328 commit 3028ec4
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 93 deletions.
35 changes: 27 additions & 8 deletions retry/adaptive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package retry

import (
"context"
"fmt"
"math/bits"
"sync/atomic"
Expand All @@ -24,27 +23,47 @@ import (
"github.com/ecodeclub/ekit/internal/errs"
)

var _ Strategy = (*AdaptiveTimeoutRetryStrategy)(nil)

type AdaptiveTimeoutRetryStrategy struct {
strategy Strategy // 基础重试策略
threshold int // 超时比率阈值 (单位:比特数量)
ringBuffer []uint64 // 比特环(滑动窗口存储超时信息)
reqCount uint64 // 当前滑动窗口内超时的数量
ringBufferLen int // 滑动窗口长度

}

func (s *AdaptiveTimeoutRetryStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
if err == nil {
s.markSuccess()
return 0, false
}
func (s *AdaptiveTimeoutRetryStrategy) Next() (time.Duration, bool) {
failCount := s.getFailed()
s.markFail()
if failCount >= s.threshold {
return 0, false
}
return s.strategy.Next(ctx, err)
return s.strategy.Next()
}

func (s *AdaptiveTimeoutRetryStrategy) Report(err error) Strategy {
if err == nil {
s.markSuccess()
} else {
s.markFail()
}
return s
}

//func (s *AdaptiveTimeoutRetryStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
// if err == nil {
// s.markSuccess()
// return 0, false
// }
// failCount := s.getFailed()
// s.markFail()
// if failCount >= s.threshold {
// return 0, false
// }
// return s.strategy.Next(ctx, err)
//}

func (s *AdaptiveTimeoutRetryStrategy) markSuccess() {
count := atomic.AddUint64(&s.reqCount, 1)
count = count % (uint64(64) * uint64(len(s.ringBuffer)))
Expand Down
25 changes: 10 additions & 15 deletions retry/adaptive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package retry

import (
"context"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -89,32 +88,24 @@ func TestAdaptiveTimeoutRetryStrategy_Next(t *testing.T) {

tests := []struct {
name string
err error
wantDelay time.Duration
wantOk bool
}{
{
name: "error below threshold",
err: errors.New("test error"),
wantDelay: 1 * time.Second,
wantOk: true,
},
{
name: "error above threshold",
err: errors.New("test error"),
wantDelay: 1 * time.Second,
wantOk: true,
},
{
name: "not retry",
wantDelay: 0,
wantOk: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
delay, ok := strategy.Next(context.Background(), tt.err)
delay, ok := strategy.Next()
assert.Equal(t, tt.wantDelay, delay)
assert.Equal(t, tt.wantOk, ok)
})
Expand Down Expand Up @@ -147,7 +138,8 @@ func TestAdaptiveTimeoutRetryStrategy_Next_Concurrent(t *testing.T) {
if index >= 1500 {
err = mockErr
}
_, allowed := strategy.Next(context.Background(), err)
strategy.Report(err)
_, allowed := strategy.Next()
if err != nil {
// 失败请求的统计
if allowed {
Expand Down Expand Up @@ -187,11 +179,10 @@ func ExampleAdaptiveTimeoutRetryStrategy_Next() {
fmt.Println(err)
return
}
nextErr := errors.New("test error")
interval, ok := strategy.Next(context.Background(), nextErr)
interval, ok := strategy.Next()
for ok {
fmt.Println(interval)
interval, ok = strategy.Next(context.Background(), nextErr)
interval, ok = strategy.Next()
}
// Output:
// 1s
Expand All @@ -209,6 +200,10 @@ func ExampleAdaptiveTimeoutRetryStrategy_Next() {
type MockStrategy struct {
}

func (m MockStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
func (m MockStrategy) Next() (time.Duration, bool) {
return 1 * time.Second, true
}

func (m MockStrategy) Report(err error) Strategy {
return m
}
12 changes: 5 additions & 7 deletions retry/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
package retry

import (
"context"
"math"
"sync/atomic"
"time"

"github.com/ecodeclub/ekit/internal/errs"
)

var _ Strategy = (*ExponentialBackoffRetryStrategy)(nil)

// ExponentialBackoffRetryStrategy 指数退避重试
type ExponentialBackoffRetryStrategy struct {
// 初始重试间隔
Expand Down Expand Up @@ -50,14 +51,11 @@ func NewExponentialBackoffRetryStrategy(initialInterval, maxInterval time.Durati
}, nil
}

func (s *ExponentialBackoffRetryStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
if err != nil {
return s.next()
}
return 0, false
func (s *ExponentialBackoffRetryStrategy) Report(err error) Strategy {
return s
}

func (s *ExponentialBackoffRetryStrategy) next() (time.Duration, bool) {
func (s *ExponentialBackoffRetryStrategy) Next() (time.Duration, bool) {
retries := atomic.AddInt32(&s.retries, 1)
if s.maxRetries <= 0 || retries <= s.maxRetries {
if reached, ok := s.maxIntervalReached.Load().(bool); ok && reached {
Expand Down
28 changes: 7 additions & 21 deletions retry/exponential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package retry

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -84,16 +83,14 @@ func TestNewExponentialBackoffRetryStrategy_New(t *testing.T) {
func TestExponentialBackoffRetryStrategy_Next(t *testing.T) {
testCases := []struct {
name string
nextErr error
ctx context.Context
strategy *ExponentialBackoffRetryStrategy

wantIntervals []time.Duration
}{
{
name: "stop if retries reaches maxRetries",
ctx: context.Background(),
nextErr: errors.New("test error"),
name: "stop if retries reaches maxRetries",
ctx: context.Background(),
strategy: func() *ExponentialBackoffRetryStrategy {
s, err := NewExponentialBackoffRetryStrategy(1*time.Second, 10*time.Second, 3)
require.NoError(t, err)
Expand All @@ -103,9 +100,8 @@ func TestExponentialBackoffRetryStrategy_Next(t *testing.T) {
wantIntervals: []time.Duration{1 * time.Second, 2 * time.Second, 4 * time.Second},
},
{
name: "initialInterval over maxInterval",
ctx: context.Background(),
nextErr: errors.New("test error"),
name: "initialInterval over maxInterval",
ctx: context.Background(),
strategy: func() *ExponentialBackoffRetryStrategy {
s, err := NewExponentialBackoffRetryStrategy(1*time.Second, 4*time.Second, 5)
require.NoError(t, err)
Expand All @@ -114,22 +110,12 @@ func TestExponentialBackoffRetryStrategy_Next(t *testing.T) {

wantIntervals: []time.Duration{1 * time.Second, 2 * time.Second, 4 * time.Second, 4 * time.Second, 4 * time.Second},
},
{
name: "not retry",
ctx: context.Background(),
strategy: func() *ExponentialBackoffRetryStrategy {
s, err := NewExponentialBackoffRetryStrategy(1*time.Second, 4*time.Second, 5)
require.NoError(t, err)
return s
}(),
wantIntervals: []time.Duration{},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
intervals := make([]time.Duration, 0)
for {
if interval, ok := tt.strategy.Next(tt.ctx, tt.nextErr); ok {
if interval, ok := tt.strategy.Next(); ok {
intervals = append(intervals, interval)
} else {
break
Expand Down Expand Up @@ -160,10 +146,10 @@ func ExampleExponentialBackoffRetryStrategy_Next() {
fmt.Println(err)
return
}
interval, ok := retry.next()
interval, ok := retry.Next()
for ok {
fmt.Println(interval)
interval, ok = retry.next()
interval, ok = retry.Next()
}
// Output:
// 1s
Expand Down
16 changes: 7 additions & 9 deletions retry/fixed_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package retry

import (
"context"
"sync/atomic"
"time"

"github.com/ecodeclub/ekit/internal/errs"
)

var _ Strategy = (*FixedIntervalRetryStrategy)(nil)

// FixedIntervalRetryStrategy 等间隔重试
type FixedIntervalRetryStrategy struct {
maxRetries int32 // 最大重试次数,如果是 0 或负数,表示无限重试
Expand All @@ -39,17 +40,14 @@ func NewFixedIntervalRetryStrategy(interval time.Duration, maxRetries int32) (*F
}, nil
}

func (s *FixedIntervalRetryStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
if err != nil {
return s.next()
}
return 0, false
}

func (s *FixedIntervalRetryStrategy) next() (time.Duration, bool) {
func (s *FixedIntervalRetryStrategy) Next() (time.Duration, bool) {
retries := atomic.AddInt32(&s.retries, 1)
if s.maxRetries <= 0 || retries <= s.maxRetries {
return s.interval, true
}
return 0, false
}

func (s *FixedIntervalRetryStrategy) Report(err error) Strategy {
return s
}
Loading

0 comments on commit 3028ec4

Please sign in to comment.