-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathretry.go
201 lines (155 loc) · 4.38 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package client
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"github.com/cenkalti/backoff/v4"
"github.com/go-logr/logr"
)
var errTemporary = errors.New("temporary error occurred")
// NewRetryWrapper returns a TransportWrapper which detects whether
// a HTTP request should be retried given a particular failure scenario.
// A variadic slice of options can be provided to configure the retry
// behavior from default.
func NewRetryWrapper(opts ...RetryWrapperOption) *RetryWrapper {
var cfg RetryWrapperConfig
cfg.Option(opts...)
cfg.Default()
if cfg.maxRetries > 0 {
inner := cfg.GenerateBackoff
cfg.GenerateBackoff = func() backoff.BackOff {
return backoff.WithMaxRetries(inner(), cfg.maxRetries)
}
}
return &RetryWrapper{
cfg: cfg,
}
}
type RetryWrapper struct {
cfg RetryWrapperConfig
rt http.RoundTripper
}
func (w *RetryWrapper) Wrap(rt http.RoundTripper) http.RoundTripper {
w.rt = rt
return w
}
func (w *RetryWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
log := w.cfg.Logger.WithValues(
"method", req.Method,
"host", req.URL.Host,
"path", req.URL.Path,
)
// preserve request body so that each request can be made with a readable body
copy, err := copyRequestBody(req)
if err != nil {
return nil, fmt.Errorf("copying request body: %w", err)
}
retries := 0
var res *http.Response
roundtrip := func() error {
if retries > 0 {
log.Info("retrying request",
"retries", retries,
)
}
if copy != nil {
req.Body = io.NopCloser(bytes.NewBuffer(copy))
}
// drain open response body so that existing connections may be reused
if res != nil {
drainResponseBody(w.cfg.Logger.V(1), res)
}
var err error
res, err = w.rt.RoundTrip(req)
if err != nil {
if !w.cfg.Policy.IsErrorRetryable(err) {
// exit with error if request failed before a response was received
return backoff.Permanent(err)
}
return errTemporary
}
log.Info("received response",
"responseStatus", res.StatusCode,
)
if !w.cfg.Policy.IsStatusRetryableForMethod(req.Method, res.StatusCode) {
// exit with no error if HTTP status code does not permit retry
return nil
}
retries++
// exit with temporary error to retry request
return errTemporary
}
bo := backoff.WithContext(w.cfg.GenerateBackoff(), req.Context())
if err := backoff.Retry(roundtrip, bo); err != nil {
if !errors.Is(err, errTemporary) && !errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("permanent error encountered: %w", err)
}
}
return res, nil
}
func copyRequestBody(req *http.Request) ([]byte, error) {
if req.Body == nil || req.Body == http.NoBody {
return nil, nil
}
copy, err := io.ReadAll(req.Body)
if err != nil {
return nil, fmt.Errorf("reading request body: %w", err)
}
if err := req.Body.Close(); err != nil {
return nil, fmt.Errorf("closing request body: %w", err)
}
return copy, nil
}
func drainResponseBody(logger logr.Logger, res *http.Response) {
defer res.Body.Close()
if _, err := io.Copy(io.Discard, res.Body); err != nil {
logger.Info("unable to discard response body",
"error", err,
)
}
}
type RetryWrapperConfig struct {
Logger logr.Logger
GenerateBackoff func() backoff.BackOff
Policy RetryPolicy
maxRetries uint64
}
func (c *RetryWrapperConfig) Option(opts ...RetryWrapperOption) {
for _, opt := range opts {
opt.ConfigureRetryWrapper(c)
}
}
func (c *RetryWrapperConfig) Default() {
if c.Logger.GetSink() == nil {
c.Logger = logr.Discard()
}
if c.GenerateBackoff == nil {
c.GenerateBackoff = ExponentialBackoffGenerator()
}
if c.Policy == nil {
c.Policy = NewDefaultRetryPolicy()
}
}
type RetryWrapperOption interface {
ConfigureRetryWrapper(*RetryWrapperConfig)
}
// WithLogger configures a RetryWrapper instance with the provided
// logr.Logger instance.
type WithLogger struct{ logr.Logger }
func (l WithLogger) ConfigureRetryWrapper(c *RetryWrapperConfig) {
c.Logger = l.Logger
}
// WithBackoffGenerator configures a RetryWrapper instance with the
// provided BackoffGenerator.
type WithBackoffGenerator func() backoff.BackOff
func (bg WithBackoffGenerator) ConfigureRetryWrapper(c *RetryWrapperConfig) {
c.GenerateBackoff = bg
}
// WithMaxRetries sets the maximum retry attempts for a RetryWrapper instance.
type WithMaxRetries uint64
func (mr WithMaxRetries) ConfigureRetryWrapper(c *RetryWrapperConfig) {
c.maxRetries = uint64(mr)
}