Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature:自适应重试控制策略 #270

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/errs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ func NewErrInvalidMaxIntervalValue(maxInterval, initialInterval time.Duration) e
func NewErrRetryExhausted(lastErr error) error {
return fmt.Errorf("ekit: 超过最大重试次数,业务返回的最后一个 error %w", lastErr)
}

func NewErrInvalidThresholdValue(threshold int) error {
return fmt.Errorf("ekit: 失效比率阈值 [%d]", threshold)
}
130 changes: 130 additions & 0 deletions retry/adaptive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"fmt"
"math/bits"
"sync/atomic"
"time"

"github.com/ecodeclub/ekit/internal/errs"
)

type AdaptiveTimeoutRetryStrategy struct {
strategy Strategy // 基础重试策略
threshold int // 超时比率阈值 (单位:比特数量)
ringBuffer []uint64 // 比特环(滑动窗口存储超时信息)
reqCount uint64 // 当前滑动窗口内超时的数量
flycash marked this conversation as resolved.
Show resolved Hide resolved
ringBufferLen int // 滑动窗口长度
}

func (s *AdaptiveTimeoutRetryStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
if err == nil {
s.markSuccess()
return 0, false
}
failCount := s.getFailed()
s.markFail()
if failCount >= s.threshold {
return 0, false
}
return s.strategy.Next(ctx, err)
}

func (s *AdaptiveTimeoutRetryStrategy) markSuccess() {
count := atomic.AddUint64(&s.reqCount, 1)
count = count % (uint64(64) * uint64(len(s.ringBuffer)))
idx := count / 64
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved
bitPos := count % 64
for {
old := atomic.LoadUint64(&s.ringBuffer[idx])
// 检查 old 的第 bitPos 位是否为 1。如果结果为 0,表示该位为 0,即没有记录失败
if old&(uint64(1)<<bitPos) == 0 {
break
}
if atomic.CompareAndSwapUint64(&s.ringBuffer[idx], old, old&^(uint64(1)<<bitPos)) {
break
}
}
}

func (s *AdaptiveTimeoutRetryStrategy) markFail() {
count := atomic.AddUint64(&s.reqCount, 1)
count = count % (uint64(64) * uint64(len(s.ringBuffer)))
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved
idx := count / 64
bitPos := count % 64
for {
old := atomic.LoadUint64(&s.ringBuffer[idx])
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved
// 检查 old 的第 bitPos 位是否为1。如果结果不等于0,表示该位已经被设置为1。
if old&(uint64(1)<<bitPos) != 0 {
// 已被设置为1
break
}
// (uint64(1)<<bitPos) 将目标位设置为1
if atomic.CompareAndSwapUint64(&s.ringBuffer[idx], old, old|(uint64(1)<<bitPos)) {
break
}
}
flycash marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *AdaptiveTimeoutRetryStrategy) getFailed() int {
var failCount int
for i := 0; i < len(s.ringBuffer); i++ {
v := atomic.LoadUint64(&s.ringBuffer[i])
failCount += bits.OnesCount64(v)
}
return failCount
}

func NewAdaptiveTimeoutRetryStrategy(strategy Strategy, opts ...Option) (*AdaptiveTimeoutRetryStrategy, error) {
flycash marked this conversation as resolved.
Show resolved Hide resolved
if strategy == nil {
return nil, fmt.Errorf("ekit: strategy 不能为空")
}

res := &AdaptiveTimeoutRetryStrategy{
strategy: strategy,
}
for _, opt := range opts {
opt(res)
}

if res.ringBufferLen <= 0 {
return nil, fmt.Errorf("ekit: 无效的滑动窗口长度 [%d]", res.ringBufferLen)

}

if res.threshold <= 0 {
return nil, errs.NewErrInvalidThresholdValue(res.threshold)
}
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved

res.ringBuffer = make([]uint64, res.ringBufferLen)
return res, nil
}

type Option func(*AdaptiveTimeoutRetryStrategy)

func WithRingBufferLen(l int) Option {
return func(s *AdaptiveTimeoutRetryStrategy) {
s.ringBufferLen = l
}
}

func WithThreshold(t int) Option {
return func(s *AdaptiveTimeoutRetryStrategy) {
s.threshold = t
}
}
214 changes: 214 additions & 0 deletions retry/adaptive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewAdaptiveTimeoutRetryStrategy_New(t *testing.T) {
testCases := []struct {
name string
threshold int
ringBufferLen int
strategy Strategy
want *AdaptiveTimeoutRetryStrategy
wantErr error
}{
{
name: "valid strategy and threshold",
strategy: &MockStrategy{}, // 假设有一个 MockStrategy 用于测试
threshold: 50,
ringBufferLen: 16,
want: func() *AdaptiveTimeoutRetryStrategy {
s, err := NewAdaptiveTimeoutRetryStrategy(&MockStrategy{}, WithThreshold(50), WithRingBufferLen(16))
require.NoError(t, err)
return s
}(),
wantErr: nil,
},
{
name: "threshold less than or equal to zero",
strategy: &MockStrategy{},
ringBufferLen: 16,
threshold: 0,
want: nil,
wantErr: fmt.Errorf("ekit: 失效比率阈值 [%d]", 0),
},
{
name: "ring buffer len less than or equal to zero",
strategy: &MockStrategy{},
threshold: 10,
ringBufferLen: 0,
want: nil,
wantErr: fmt.Errorf("ekit: 无效的滑动窗口长度 [%d]", 0),
},
{
name: "strategy is nil",
strategy: nil,
threshold: 10,
want: nil,
wantErr: fmt.Errorf("ekit: strategy 不能为空"),
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
s, err := NewAdaptiveTimeoutRetryStrategy(tt.strategy, WithThreshold(tt.threshold), WithRingBufferLen(tt.ringBufferLen))
assert.Equal(t, tt.wantErr, err)
assert.Equal(t, tt.want, s)
})
}
}

func TestAdaptiveTimeoutRetryStrategy_Next(t *testing.T) {
baseStrategy := &MockStrategy{}
strategy, err := NewAdaptiveTimeoutRetryStrategy(baseStrategy, WithThreshold(50), WithRingBufferLen(16))
require.NoError(t, err)

tests := []struct {
name string
err error
wantDelay time.Duration
wantOk bool
}{
{
name: "error below threshold",
err: errors.New("test error"),
wantDelay: 1 * time.Second,
wantOk: true,
},
{
name: "error above threshold",
err: errors.New("test error"),
wantDelay: 1 * time.Second,
wantOk: true,
},
{
name: "not retry",
wantDelay: 0,
wantOk: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
delay, ok := strategy.Next(context.Background(), tt.err)
assert.Equal(t, tt.wantDelay, delay)
assert.Equal(t, tt.wantOk, ok)
})
}
}

// 测试场景
// 阈值是50
// 2000个请求 有1500个成功的 有500个失败的 最后统计500个失败的有50个可以执行 有450个不能执行 1500成功的都能执行
func TestAdaptiveTimeoutRetryStrategy_Next_Concurrent(t *testing.T) {
// 创建一个基础策略
baseStrategy := &MockStrategy{}

// 创建升级版自适应策略,设置阈值为50
strategy, err := NewAdaptiveTimeoutRetryStrategy(baseStrategy,
WithThreshold(50), WithRingBufferLen(16))
assert.Nil(t, err)

var wg sync.WaitGroup
var successCount, errCount int64
mockErr := errors.New("mock error")

// 并发执行2000个请求
for i := 0; i < 2000; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
// 前1500个请求成功,后500个失败
var err error
if index >= 1500 {
err = mockErr
}
_, allowed := strategy.Next(context.Background(), err)
if err != nil {
// 失败请求的统计
if allowed {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errCount, 1)
}
}
}(i)
}

// 等待所有goroutine完成
wg.Wait()

// 验证结果:期望大约50个失败请求可以执行,450个被拒绝
// 由于是环形缓冲区和并发执行,可能会有一些误差,这里使用一个合理的范围进行判断
finalSuccessCount := int(atomic.LoadInt64(&successCount))
finalErrCount := int(atomic.LoadInt64(&errCount))
if finalSuccessCount < 45 || finalSuccessCount > 55 {
t.Errorf("期望大约50个失败请求被允许执行,实际允许执行的失败请求数量为: %d", finalSuccessCount)
}

if finalErrCount < 445 || finalErrCount > 455 {
t.Errorf("期望大约450个失败请求被拒绝执行,实际被拒绝的失败请求数量为: %d", finalErrCount)
}
}

func ExampleAdaptiveTimeoutRetryStrategy_Next() {
baseStrategy, err := NewExponentialBackoffRetryStrategy(time.Second, time.Second*5, 10)
if err != nil {
fmt.Println(err)
return
}
strategy, err := NewAdaptiveTimeoutRetryStrategy(baseStrategy,
WithThreshold(50), WithRingBufferLen(16))
if err != nil {
fmt.Println(err)
return
}
nextErr := errors.New("test error")
interval, ok := strategy.Next(context.Background(), nextErr)
for ok {
fmt.Println(interval)
interval, ok = strategy.Next(context.Background(), nextErr)
}
// Output:
// 1s
// 2s
// 4s
// 5s
// 5s
// 5s
// 5s
// 5s
// 5s
// 5s
}

type MockStrategy struct {
}

func (m MockStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
return 1 * time.Second, true
}
10 changes: 9 additions & 1 deletion retry/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package retry

import (
"context"
"math"
"sync/atomic"
"time"
Expand Down Expand Up @@ -49,7 +50,14 @@ func NewExponentialBackoffRetryStrategy(initialInterval, maxInterval time.Durati
}, nil
}

func (s *ExponentialBackoffRetryStrategy) Next() (time.Duration, bool) {
func (s *ExponentialBackoffRetryStrategy) Next(ctx context.Context, err error) (time.Duration, bool) {
if err != nil {
return s.next()
}
return 0, false
}

func (s *ExponentialBackoffRetryStrategy) next() (time.Duration, bool) {
retries := atomic.AddInt32(&s.retries, 1)
if s.maxRetries <= 0 || retries <= s.maxRetries {
if reached, ok := s.maxIntervalReached.Load().(bool); ok && reached {
Expand Down
Loading
Loading