diff --git a/retry/adaptive.go b/retry/adaptive.go new file mode 100644 index 0000000..9e166b8 --- /dev/null +++ b/retry/adaptive.go @@ -0,0 +1,90 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math/bits" + "sync/atomic" + "time" +) + +var _ Strategy = (*AdaptiveTimeoutRetryStrategy)(nil) + +type AdaptiveTimeoutRetryStrategy struct { + strategy Strategy // 基础重试策略 + threshold int // 超时比率阈值 (单位:比特数量) + ringBuffer []uint64 // 比特环(滑动窗口存储超时信息) + reqCount uint64 // 请求数量 + bufferLen int // 滑动窗口长度 + bitCnt uint64 // 比特位总数 +} + +func (s *AdaptiveTimeoutRetryStrategy) Next() (time.Duration, bool) { + failCount := s.getFailed() + if failCount >= s.threshold { + return 0, false + } + return s.strategy.Next() +} + +func (s *AdaptiveTimeoutRetryStrategy) Report(err error) Strategy { + if err == nil { + s.markSuccess() + } else { + s.markFail() + } + return s +} + +func (s *AdaptiveTimeoutRetryStrategy) markSuccess() { + count := atomic.AddUint64(&s.reqCount, 1) + count = count % s.bitCnt + // 对2^x进行取模或者整除运算时可以用位运算代替除法和取模 + // count / 64 可以转换成 count >> 6。 位运算会更高效。 + idx := count >> 6 + // count % 64 可以转换成 count & 63 + bitPos := count & 63 + old := atomic.LoadUint64(&s.ringBuffer[idx]) + atomic.StoreUint64(&s.ringBuffer[idx], old&^(uint64(1)<> 6 + bitPos := count & 63 + old := atomic.LoadUint64(&s.ringBuffer[idx]) + // (uint64(1)<= 1500 { + err = mockErr + } + strategy.Report(err) + _, allowed := strategy.Next() + if err != nil { + // 失败请求的统计 + if allowed { + atomic.AddInt64(&successCount, 1) + } else { + atomic.AddInt64(&errCount, 1) + } + } + }(i) + } + + // 等待所有goroutine完成 + wg.Wait() + + // 验证结果:期望大约50个失败请求可以执行,450个被拒绝 + // 由于是环形缓冲区和并发执行,可能会有一些误差,这里使用一个合理的范围进行判断 + finalSuccessCount := int(atomic.LoadInt64(&successCount)) + finalErrCount := int(atomic.LoadInt64(&errCount)) + if finalSuccessCount < 45 || finalSuccessCount > 55 { + t.Errorf("期望大约50个失败请求被允许执行,实际允许执行的失败请求数量为: %d", finalSuccessCount) + } + + if finalErrCount < 445 || finalErrCount > 455 { + t.Errorf("期望大约450个失败请求被拒绝执行,实际被拒绝的失败请求数量为: %d", finalErrCount) + } +} + +func ExampleAdaptiveTimeoutRetryStrategy_Next() { + baseStrategy, err := NewExponentialBackoffRetryStrategy(time.Second, time.Second*5, 10) + if err != nil { + fmt.Println(err) + return + } + strategy := NewAdaptiveTimeoutRetryStrategy(baseStrategy, 16, 50) + interval, ok := strategy.Next() + for ok { + fmt.Println(interval) + interval, ok = strategy.Next() + } + // Output: + // 1s + // 2s + // 4s + // 5s + // 5s + // 5s + // 5s + // 5s + // 5s + // 5s +} + +type MockStrategy struct { +} + +func (m MockStrategy) Next() (time.Duration, bool) { + return 1 * time.Second, true +} + +func (m MockStrategy) Report(err error) Strategy { + return m +} diff --git a/retry/exponential.go b/retry/exponential.go index 98ad4c1..8d06e5a 100644 --- a/retry/exponential.go +++ b/retry/exponential.go @@ -22,6 +22,8 @@ import ( "github.com/ecodeclub/ekit/internal/errs" ) +var _ Strategy = (*ExponentialBackoffRetryStrategy)(nil) + // ExponentialBackoffRetryStrategy 指数退避重试 type ExponentialBackoffRetryStrategy struct { // 初始重试间隔 @@ -49,6 +51,10 @@ func NewExponentialBackoffRetryStrategy(initialInterval, maxInterval time.Durati }, nil } +func (s *ExponentialBackoffRetryStrategy) Report(err error) Strategy { + return s +} + func (s *ExponentialBackoffRetryStrategy) Next() (time.Duration, bool) { retries := atomic.AddInt32(&s.retries, 1) if s.maxRetries <= 0 || retries <= s.maxRetries { diff --git a/retry/exponential_test.go b/retry/exponential_test.go index e9172b0..d8f4c4e 100644 --- a/retry/exponential_test.go +++ b/retry/exponential_test.go @@ -15,6 +15,7 @@ package retry import ( + "context" "fmt" "testing" "time" @@ -82,12 +83,14 @@ func TestNewExponentialBackoffRetryStrategy_New(t *testing.T) { func TestExponentialBackoffRetryStrategy_Next(t *testing.T) { testCases := []struct { name string + ctx context.Context strategy *ExponentialBackoffRetryStrategy wantIntervals []time.Duration }{ { name: "stop if retries reaches maxRetries", + ctx: context.Background(), strategy: func() *ExponentialBackoffRetryStrategy { s, err := NewExponentialBackoffRetryStrategy(1*time.Second, 10*time.Second, 3) require.NoError(t, err) @@ -98,6 +101,7 @@ func TestExponentialBackoffRetryStrategy_Next(t *testing.T) { }, { name: "initialInterval over maxInterval", + ctx: context.Background(), strategy: func() *ExponentialBackoffRetryStrategy { s, err := NewExponentialBackoffRetryStrategy(1*time.Second, 4*time.Second, 5) require.NoError(t, err) diff --git a/retry/fixed_internal.go b/retry/fixed_internal.go index 19d18aa..ea210db 100644 --- a/retry/fixed_internal.go +++ b/retry/fixed_internal.go @@ -21,6 +21,8 @@ import ( "github.com/ecodeclub/ekit/internal/errs" ) +var _ Strategy = (*FixedIntervalRetryStrategy)(nil) + // FixedIntervalRetryStrategy 等间隔重试 type FixedIntervalRetryStrategy struct { maxRetries int32 // 最大重试次数,如果是 0 或负数,表示无限重试 @@ -45,3 +47,7 @@ func (s *FixedIntervalRetryStrategy) Next() (time.Duration, bool) { } return 0, false } + +func (s *FixedIntervalRetryStrategy) Report(err error) Strategy { + return s +} diff --git a/retry/fixed_internal_test.go b/retry/fixed_internal_test.go index ba4e135..e9deabb 100644 --- a/retry/fixed_internal_test.go +++ b/retry/fixed_internal_test.go @@ -15,6 +15,7 @@ package retry import ( + "context" "fmt" "testing" "time" @@ -30,6 +31,7 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) { testCases := []struct { name string + ctx context.Context s *FixedIntervalRetryStrategy interval time.Duration @@ -37,16 +39,17 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) { }{ { name: "init case, retries 0", + ctx: context.Background(), s: &FixedIntervalRetryStrategy{ maxRetries: 3, interval: time.Second, }, - interval: time.Second, isContinue: true, }, { name: "retries equals to MaxRetries 3 after the increase", + ctx: context.Background(), s: &FixedIntervalRetryStrategy{ maxRetries: 3, interval: time.Second, @@ -57,6 +60,7 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) { }, { name: "retries over MaxRetries after the increase", + ctx: context.Background(), s: &FixedIntervalRetryStrategy{ maxRetries: 3, interval: time.Second, @@ -67,6 +71,7 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) { }, { name: "MaxRetries equals to 0", + ctx: context.Background(), s: &FixedIntervalRetryStrategy{ maxRetries: 0, interval: time.Second, @@ -76,6 +81,7 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) { }, { name: "negative MaxRetries", + ctx: context.Background(), s: &FixedIntervalRetryStrategy{ maxRetries: -1, interval: time.Second, diff --git a/retry/types.go b/retry/types.go index 7b856ee..11ae405 100644 --- a/retry/types.go +++ b/retry/types.go @@ -14,9 +14,12 @@ package retry -import "time" +import ( + "time" +) type Strategy interface { // Next 返回下一次重试的间隔,如果不需要继续重试,那么第二参数返回 false Next() (time.Duration, bool) + Report(err error) Strategy }