diff --git a/go.mod b/go.mod index cf8371c8b3c..db9087e7768 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 github.com/dustin/go-humanize v1.0.1 github.com/edsrzf/mmap-go v1.1.0 - github.com/failsafe-go/failsafe-go v0.6.2 + github.com/failsafe-go/failsafe-go v0.6.8 github.com/felixge/fgprof v0.9.4 github.com/go-kit/log v0.2.1 github.com/go-openapi/strfmt v0.23.0 diff --git a/go.sum b/go.sum index e330586c7d2..cb81a7842a7 100644 --- a/go.sum +++ b/go.sum @@ -277,8 +277,8 @@ github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb h1:IT4JYU7k4ikYg1SCxNI1/Tieq/NFvh6dzLdgi7eu0tM= github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb/go.mod h1:bH6Xx7IW64qjjJq8M2u4dxNaBiDfKK+z/3eGDpXEQhc= -github.com/failsafe-go/failsafe-go v0.6.2 h1:zRyfYykM080+h40uUuf9HYLRn7vpnR+wjcg68fhwD28= -github.com/failsafe-go/failsafe-go v0.6.2/go.mod h1:UCRnPYTVzBt7QGPFAAmFZUtB49dCLVFt38YYzGHXBCA= +github.com/failsafe-go/failsafe-go v0.6.8 h1:ERrJMknjXdtDVrx1s05uE5MCDhGTiF7rQ98z6bdVUOw= +github.com/failsafe-go/failsafe-go v0.6.8/go.mod h1:LAo0yJE2PXn1z4T22bkmUxPryrTHUvMhvnwik9x2uq8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= diff --git a/vendor/github.com/failsafe-go/failsafe-go/CHANGELOG.md b/vendor/github.com/failsafe-go/failsafe-go/CHANGELOG.md index d6f7eaa033e..46a88c95992 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/CHANGELOG.md +++ b/vendor/github.com/failsafe-go/failsafe-go/CHANGELOG.md @@ -1,14 +1,64 @@ ## Upcoming Release +## 0.6.8 + +### Bug Fixes + +- Fixed #65 - Mixing failsafehttp.RetryPolicy with HedgePolicy causes contexts to be canceled early. + +### Improvements + +- Added `Context()` to `circuitbreaker.StateChangedEvent`. + +## 0.6.7 + +### Improvements + +- Added `HandleErrorTypes` to match errors by type in retry policies, circuit breakers, and fallbacks. This is similar to the matching that `errors.As` does. +- Added `RetryPolicyBuilder.AbortOnErrorTypes`. +- Added `HedgePolicyBuilder.CancelOnErrorTypes`. + +### API Changes + +- Renamed the `retrypolicy.ExceededError` `LastResult()` to `LastResult` and `LastError()` to `LastError`. + +## 0.6.6 + +### Bug Fixes + +- Always expose metrics from old state in `StateChangedEvent`. + +## 0.6.5 + +### Improvements + +- Added gRPC unary client, unary server, and server inHandle support. +- Expose Context() in event listeners. +- Improve HTTP context cancellation. +- Add `Metrics()` to `circuitbreaker.StateChangedEvent`. +- Default `failsafehttp.RetryPolicyBuilder()` to abort on `context.Canceled`. + +## 0.6.4 + +### Bug Fixes + +- Improve bulkhead handling of short max wait times. + +## 0.6.3 + +### Improvements + +- Optimized memory usage in time based circuit breakers. + ## 0.6.2 -## Improvements +### Improvements - New CachePolicy. ## 0.6.1 -## Improvements +### Improvements - Better support for HedgePolicy and Timeout composition diff --git a/vendor/github.com/failsafe-go/failsafe-go/Makefile b/vendor/github.com/failsafe-go/failsafe-go/Makefile index 7d9e70f6c10..733fc8430d3 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/Makefile +++ b/vendor/github.com/failsafe-go/failsafe-go/Makefile @@ -26,4 +26,4 @@ lint: ## Lint Failsafe-go .PHONY: check check: fmt test ## Check Failsafe-go for a commit or release - go mod tidy \ No newline at end of file + go mod tidy diff --git a/vendor/github.com/failsafe-go/failsafe-go/README.md b/vendor/github.com/failsafe-go/failsafe-go/README.md index e2fce70c5a7..dfe47896bb8 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/README.md +++ b/vendor/github.com/failsafe-go/failsafe-go/README.md @@ -7,7 +7,7 @@ [![Slack](https://img.shields.io/badge/slack-failsafe-brightgreen.svg?logo=slack)](https://failsafe-go.slack.com) [![Godoc](https://pkg.go.dev/badge/github.com/failsafe-go/failsafe-go)](https://pkg.go.dev/github.com/failsafe-go/failsafe-go) -Failsafe-go is a library for building fault tolerant Go applications. It works by wrapping executable logic with one or more resilience policies, which can be combined and composed as needed. +Failsafe-go is a library for building resilient, fault tolerant Go applications. It works by wrapping functions with one or more resilience policies, which can be combined and composed as needed. Policies include [Retry](https://failsafe-go.dev/retry), [CircuitBreaker](https://failsafe-go.dev/circuit-breaker), [RateLimiter](https://failsafe-go.dev/rate-limiter), [Timeout](https://failsafe-go.dev/timeout), [Fallback](https://failsafe-go.dev/fallback), [Hedge](https://failsafe-go.dev/hedge/), [Bulkhead](https://failsafe-go.dev/bulkhead), and [Cache](https://failsafe-go.dev/cache). diff --git a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreaker.go b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreaker.go index 6402c3ef640..8a6024c6443 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreaker.go +++ b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreaker.go @@ -1,6 +1,7 @@ package circuitbreaker import ( + "context" "errors" "sync" "time" @@ -144,11 +145,24 @@ type Metrics interface { type StateChangedEvent struct { OldState State NewState State + metrics *eventMetrics + context context.Context +} + +// Metrics returns metrics from the CircuitBreaker old state. +func (e *StateChangedEvent) Metrics() Metrics { + return e.metrics +} + +// Context returns the context configured for the execution, else context.Background if none was configured. For +// executions involving a timeout or hedge, each attempt will get a separate child context. +func (e *StateChangedEvent) Context() context.Context { + return e.context } type circuitBreaker[R any] struct { - config *circuitBreakerConfig[R] - mtx sync.Mutex + *config[R] + mtx sync.Mutex // Guarded by mtx state circuitState[R] } @@ -180,18 +194,16 @@ func (cb *circuitBreaker[R]) Close() { func (cb *circuitBreaker[R]) State() State { cb.mtx.Lock() defer cb.mtx.Unlock() - return cb.state.getState() + return cb.state.state() } func (cb *circuitBreaker[R]) RemainingDelay() time.Duration { cb.mtx.Lock() defer cb.mtx.Unlock() - return cb.state.getRemainingDelay() + return cb.state.remainingDelay() } func (cb *circuitBreaker[R]) Metrics() Metrics { - cb.mtx.Lock() - defer cb.mtx.Unlock() return cb } @@ -210,31 +222,31 @@ func (cb *circuitBreaker[R]) IsClosed() bool { func (cb *circuitBreaker[R]) Executions() uint { cb.mtx.Lock() defer cb.mtx.Unlock() - return cb.state.getStats().getExecutionCount() + return cb.state.executionCount() } func (cb *circuitBreaker[R]) Failures() uint { cb.mtx.Lock() defer cb.mtx.Unlock() - return cb.state.getStats().getFailureCount() + return cb.state.failureCount() } func (cb *circuitBreaker[R]) FailureRate() uint { cb.mtx.Lock() defer cb.mtx.Unlock() - return cb.state.getStats().getFailureRate() + return cb.state.failureRate() } func (cb *circuitBreaker[R]) Successes() uint { cb.mtx.Lock() defer cb.mtx.Unlock() - return cb.state.getStats().getSuccessCount() + return cb.state.successCount() } func (cb *circuitBreaker[R]) SuccessRate() uint { cb.mtx.Lock() defer cb.mtx.Unlock() - return cb.state.getStats().getSuccessRate() + return cb.state.successRate() } func (cb *circuitBreaker[R]) RecordFailure() { @@ -246,7 +258,7 @@ func (cb *circuitBreaker[R]) RecordFailure() { func (cb *circuitBreaker[R]) RecordError(err error) { cb.mtx.Lock() defer cb.mtx.Unlock() - cb.recordResult(*(new(R)), err) + cb.recordResult(*new(R), err) } func (cb *circuitBreaker[R]) RecordResult(result R) { @@ -262,9 +274,9 @@ func (cb *circuitBreaker[R]) RecordSuccess() { } func (cb *circuitBreaker[R]) ToExecutor(_ R) any { - cbe := &circuitBreakerExecutor[R]{ + cbe := &executor[R]{ BaseExecutor: &policy.BaseExecutor[R]{ - BaseFailurePolicy: cb.config.BaseFailurePolicy, + BaseFailurePolicy: cb.BaseFailurePolicy, }, circuitBreaker: cb, } @@ -277,15 +289,15 @@ func (cb *circuitBreaker[R]) ToExecutor(_ R) any { // Requires external locking. func (cb *circuitBreaker[R]) transitionTo(newState State, exec failsafe.Execution[R], listener func(StateChangedEvent)) { transitioned := false - currentState := cb.state.getState() - if currentState != newState { + currentState := cb.state + if currentState.state() != newState { switch newState { case ClosedState: cb.state = newClosedState(cb) case OpenState: - delay := cb.config.ComputeDelay(exec) + delay := cb.ComputeDelay(exec) if delay == -1 { - delay = cb.config.Delay + delay = cb.Delay } cb.state = newOpenState(cb, cb.state, delay) case HalfOpenState: @@ -294,20 +306,50 @@ func (cb *circuitBreaker[R]) transitionTo(newState State, exec failsafe.Executio transitioned = true } - if transitioned { + if transitioned && (listener != nil || cb.stateChangedListener != nil) { + ctx := context.Background() + if exec != nil { + ctx = exec.Context() + } event := StateChangedEvent{ - OldState: currentState, + OldState: currentState.state(), NewState: newState, - } - if cb.config.stateChangedListener != nil { - cb.config.stateChangedListener(event) + metrics: &eventMetrics{currentState}, + context: ctx, } if listener != nil { listener(event) } + if cb.stateChangedListener != nil { + cb.stateChangedListener(event) + } } } +type eventMetrics struct { + stats stats +} + +func (m *eventMetrics) Executions() uint { + return m.stats.executionCount() +} + +func (m *eventMetrics) Failures() uint { + return m.stats.failureCount() +} + +func (m *eventMetrics) FailureRate() uint { + return m.stats.failureRate() +} + +func (m *eventMetrics) Successes() uint { + return m.stats.successCount() +} + +func (m *eventMetrics) SuccessRate() uint { + return m.stats.successRate() +} + // Requires external locking. func (cb *circuitBreaker[R]) tryAcquirePermit() bool { return cb.state.tryAcquirePermit() @@ -318,22 +360,22 @@ func (cb *circuitBreaker[R]) tryAcquirePermit() bool { // // Requires external locking. func (cb *circuitBreaker[R]) open(execution failsafe.Execution[R]) { - cb.transitionTo(OpenState, execution, cb.config.openListener) + cb.transitionTo(OpenState, execution, cb.openListener) } // Requires external locking. func (cb *circuitBreaker[R]) close() { - cb.transitionTo(ClosedState, nil, cb.config.closeListener) + cb.transitionTo(ClosedState, nil, cb.closeListener) } // Requires external locking. func (cb *circuitBreaker[R]) halfOpen() { - cb.transitionTo(HalfOpenState, nil, cb.config.halfOpenListener) + cb.transitionTo(HalfOpenState, nil, cb.halfOpenListener) } // Requires external locking. func (cb *circuitBreaker[R]) recordResult(result R, err error) { - if cb.config.IsFailure(result, err) { + if cb.IsFailure(result, err) { cb.recordFailure(nil) } else { cb.recordSuccess() @@ -342,17 +384,17 @@ func (cb *circuitBreaker[R]) recordResult(result R, err error) { // Requires external locking. func (cb *circuitBreaker[R]) recordSuccess() { - cb.state.getStats().recordSuccess() + cb.state.recordSuccess() cb.state.checkThresholdAndReleasePermit(nil) } // Requires external locking. func (cb *circuitBreaker[R]) recordFailure(exec failsafe.Execution[R]) { - cb.state.getStats().recordFailure() + cb.state.recordFailure() cb.state.checkThresholdAndReleasePermit(exec) } func (cb *circuitBreaker[R]) Reset() { cb.close() - cb.state.getStats().reset() + cb.state.reset() } diff --git a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerbuilder.go b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerbuilder.go index 93c884ff677..210a5e7ddb1 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerbuilder.go +++ b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerbuilder.go @@ -66,6 +66,12 @@ type CircuitBreakerBuilder[R any] interface { // in a HalfOpenSttate state to determine whether to transition back to open or closed. WithFailureRateThreshold(failureRateThreshold uint, failureExecutionThreshold uint, failureThresholdingPeriod time.Duration) CircuitBreakerBuilder[R] + // WithDelay configures the delay to wait in OpenState before transitioning to HalfOpenState. + WithDelay(delay time.Duration) CircuitBreakerBuilder[R] + + // WithDelayFunc configures a function that provides the delay to wait in OpenState before transitioning to HalfOpenState. + WithDelayFunc(delayFunc failsafe.DelayFunc[R]) CircuitBreakerBuilder[R] + // WithSuccessThreshold configures count based success thresholding by setting the number of consecutive successful // executions that must occur when in a HalfOpenState in order to close the circuit, else the circuit is re-opened when a // failure occurs. @@ -80,7 +86,7 @@ type CircuitBreakerBuilder[R any] interface { Build() CircuitBreaker[R] } -type circuitBreakerConfig[R any] struct { +type config[R any] struct { *policy.BaseFailurePolicy[R] *policy.BaseDelayablePolicy[R] clock util.Clock @@ -101,7 +107,7 @@ type circuitBreakerConfig[R any] struct { successThresholdingCapacity uint } -var _ CircuitBreakerBuilder[any] = &circuitBreakerConfig[any]{} +var _ CircuitBreakerBuilder[any] = &config[any]{} // WithDefaults creates a count based CircuitBreaker for execution result type R that opens after a single failure, // closes after a single success, and has a 1 minute delay by default. To configure additional options on a @@ -114,10 +120,10 @@ func WithDefaults[R any]() CircuitBreaker[R] { // breaker that opens after a single failure, closes after a single success, and has a 1 minute delay, unless configured // otherwise. func Builder[R any]() CircuitBreakerBuilder[R] { - return &circuitBreakerConfig[R]{ + return &config[R]{ BaseFailurePolicy: &policy.BaseFailurePolicy[R]{}, BaseDelayablePolicy: &policy.BaseDelayablePolicy[R]{ - Delay: 1 * time.Minute, + Delay: time.Minute, }, clock: util.NewClock(), failureThreshold: 1, @@ -125,7 +131,7 @@ func Builder[R any]() CircuitBreakerBuilder[R] { } } -func (c *circuitBreakerConfig[R]) Build() CircuitBreaker[R] { +func (c *config[R]) Build() CircuitBreaker[R] { breaker := &circuitBreaker[R]{ config: c, // TODO copy base fields } @@ -133,32 +139,37 @@ func (c *circuitBreakerConfig[R]) Build() CircuitBreaker[R] { return breaker } -func (c *circuitBreakerConfig[R]) HandleErrors(errs ...error) CircuitBreakerBuilder[R] { +func (c *config[R]) HandleErrors(errs ...error) CircuitBreakerBuilder[R] { c.BaseFailurePolicy.HandleErrors(errs...) return c } -func (c *circuitBreakerConfig[R]) HandleResult(result R) CircuitBreakerBuilder[R] { +func (c *config[R]) HandleErrorTypes(errs ...any) CircuitBreakerBuilder[R] { + c.BaseFailurePolicy.HandleErrorTypes(errs...) + return c +} + +func (c *config[R]) HandleResult(result R) CircuitBreakerBuilder[R] { c.BaseFailurePolicy.HandleResult(result) return c } -func (c *circuitBreakerConfig[R]) HandleIf(predicate func(R, error) bool) CircuitBreakerBuilder[R] { +func (c *config[R]) HandleIf(predicate func(R, error) bool) CircuitBreakerBuilder[R] { c.BaseFailurePolicy.HandleIf(predicate) return c } -func (c *circuitBreakerConfig[R]) WithFailureThreshold(failureThreshold uint) CircuitBreakerBuilder[R] { +func (c *config[R]) WithFailureThreshold(failureThreshold uint) CircuitBreakerBuilder[R] { return c.WithFailureThresholdRatio(failureThreshold, failureThreshold) } -func (c *circuitBreakerConfig[R]) WithFailureThresholdRatio(failureThreshold uint, failureThresholdingCapacity uint) CircuitBreakerBuilder[R] { +func (c *config[R]) WithFailureThresholdRatio(failureThreshold uint, failureThresholdingCapacity uint) CircuitBreakerBuilder[R] { c.failureThreshold = failureThreshold c.failureThresholdingCapacity = failureThresholdingCapacity return c } -func (c *circuitBreakerConfig[R]) WithFailureThresholdPeriod(failureThreshold uint, failureThresholdingPeriod time.Duration) CircuitBreakerBuilder[R] { +func (c *config[R]) WithFailureThresholdPeriod(failureThreshold uint, failureThresholdingPeriod time.Duration) CircuitBreakerBuilder[R] { c.failureThreshold = failureThreshold c.failureThresholdingCapacity = failureThreshold c.failureExecutionThreshold = failureThreshold @@ -166,59 +177,59 @@ func (c *circuitBreakerConfig[R]) WithFailureThresholdPeriod(failureThreshold ui return c } -func (c *circuitBreakerConfig[R]) WithFailureRateThreshold(failureRateThreshold uint, failureExecutionThreshold uint, failureThresholdingPeriod time.Duration) CircuitBreakerBuilder[R] { +func (c *config[R]) WithFailureRateThreshold(failureRateThreshold uint, failureExecutionThreshold uint, failureThresholdingPeriod time.Duration) CircuitBreakerBuilder[R] { c.failureRateThreshold = failureRateThreshold c.failureExecutionThreshold = failureExecutionThreshold c.failureThresholdingPeriod = failureThresholdingPeriod return c } -func (c *circuitBreakerConfig[R]) WithSuccessThreshold(successThreshold uint) CircuitBreakerBuilder[R] { +func (c *config[R]) WithSuccessThreshold(successThreshold uint) CircuitBreakerBuilder[R] { return c.WithSuccessThresholdRatio(successThreshold, successThreshold) } -func (c *circuitBreakerConfig[R]) WithSuccessThresholdRatio(successThreshold uint, successThresholdingCapacity uint) CircuitBreakerBuilder[R] { +func (c *config[R]) WithSuccessThresholdRatio(successThreshold uint, successThresholdingCapacity uint) CircuitBreakerBuilder[R] { c.successThreshold = successThreshold c.successThresholdingCapacity = successThresholdingCapacity return c } -func (c *circuitBreakerConfig[R]) WithDelay(delay time.Duration) CircuitBreakerBuilder[R] { +func (c *config[R]) WithDelay(delay time.Duration) CircuitBreakerBuilder[R] { c.BaseDelayablePolicy.WithDelay(delay) return c } -func (c *circuitBreakerConfig[R]) WithDelayFunc(delayFunc failsafe.DelayFunc[R]) CircuitBreakerBuilder[R] { +func (c *config[R]) WithDelayFunc(delayFunc failsafe.DelayFunc[R]) CircuitBreakerBuilder[R] { c.BaseDelayablePolicy.WithDelayFunc(delayFunc) return c } -func (c *circuitBreakerConfig[R]) OnStateChanged(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { +func (c *config[R]) OnStateChanged(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { c.stateChangedListener = listener return c } -func (c *circuitBreakerConfig[R]) OnClose(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { +func (c *config[R]) OnClose(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { c.closeListener = listener return c } -func (c *circuitBreakerConfig[R]) OnOpen(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { +func (c *config[R]) OnOpen(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { c.openListener = listener return c } -func (c *circuitBreakerConfig[R]) OnHalfOpen(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { +func (c *config[R]) OnHalfOpen(listener func(event StateChangedEvent)) CircuitBreakerBuilder[R] { c.halfOpenListener = listener return c } -func (c *circuitBreakerConfig[R]) OnSuccess(listener func(event failsafe.ExecutionEvent[R])) CircuitBreakerBuilder[R] { +func (c *config[R]) OnSuccess(listener func(event failsafe.ExecutionEvent[R])) CircuitBreakerBuilder[R] { c.BaseFailurePolicy.OnSuccess(listener) return c } -func (c *circuitBreakerConfig[R]) OnFailure(listener func(event failsafe.ExecutionEvent[R])) CircuitBreakerBuilder[R] { +func (c *config[R]) OnFailure(listener func(event failsafe.ExecutionEvent[R])) CircuitBreakerBuilder[R] { c.BaseFailurePolicy.OnFailure(listener) return c } diff --git a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerexecutor.go b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerexecutor.go index 91316f96d01..3adbc6eb690 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerexecutor.go +++ b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitbreakerexecutor.go @@ -6,27 +6,27 @@ import ( "github.com/failsafe-go/failsafe-go/policy" ) -// circuitBreakerExecutor is a policy.Executor that handles failures according to a CircuitBreaker. -type circuitBreakerExecutor[R any] struct { +// executor is a policy.Executor that handles failures according to a CircuitBreaker. +type executor[R any] struct { *policy.BaseExecutor[R] *circuitBreaker[R] } -var _ policy.Executor[any] = &circuitBreakerExecutor[any]{} +var _ policy.Executor[any] = &executor[any]{} -func (e *circuitBreakerExecutor[R]) PreExecute(_ policy.ExecutionInternal[R]) *common.PolicyResult[R] { +func (e *executor[R]) PreExecute(_ policy.ExecutionInternal[R]) *common.PolicyResult[R] { if !e.TryAcquirePermit() { return internal.FailureResult[R](ErrOpen) } return nil } -func (e *circuitBreakerExecutor[R]) OnSuccess(exec policy.ExecutionInternal[R], result *common.PolicyResult[R]) { +func (e *executor[R]) OnSuccess(exec policy.ExecutionInternal[R], result *common.PolicyResult[R]) { e.BaseExecutor.OnSuccess(exec, result) e.RecordSuccess() } -func (e *circuitBreakerExecutor[R]) OnFailure(exec policy.ExecutionInternal[R], result *common.PolicyResult[R]) *common.PolicyResult[R] { +func (e *executor[R]) OnFailure(exec policy.ExecutionInternal[R], result *common.PolicyResult[R]) *common.PolicyResult[R] { // Wrap the result in the execution so it's available when computing a delay exec = exec.CopyWithResult(result).(policy.ExecutionInternal[R]) e.BaseExecutor.OnFailure(exec, result) diff --git a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstates.go b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstates.go index bf2d8a93a7f..bce390cb355 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstates.go +++ b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstates.go @@ -7,25 +7,26 @@ import ( ) // State of a CircuitBreaker. +// Implementations are not concurrency safe and must be guarded externally. type circuitState[R any] interface { - getState() State - getStats() circuitStats - getRemainingDelay() time.Duration + stats + state() State + remainingDelay() time.Duration tryAcquirePermit() bool checkThresholdAndReleasePermit(exec failsafe.Execution[R]) } type closedState[R any] struct { breaker *circuitBreaker[R] - stats circuitStats + stats } func newClosedState[R any](breaker *circuitBreaker[R]) *closedState[R] { var capacity uint - if breaker.config.failureExecutionThreshold != 0 { - capacity = breaker.config.failureExecutionThreshold + if breaker.failureExecutionThreshold != 0 { + capacity = breaker.failureExecutionThreshold } else { - capacity = breaker.config.failureThresholdingCapacity + capacity = breaker.failureThresholdingCapacity } return &closedState[R]{ breaker: breaker, @@ -33,15 +34,11 @@ func newClosedState[R any](breaker *circuitBreaker[R]) *closedState[R] { } } -func (s *closedState[R]) getState() State { +func (s *closedState[R]) state() State { return ClosedState } -func (s *closedState[R]) getStats() circuitStats { - return s.stats -} - -func (s *closedState[R]) getRemainingDelay() time.Duration { +func (s *closedState[R]) remainingDelay() time.Duration { return 0 } @@ -52,19 +49,19 @@ func (s *closedState[R]) tryAcquirePermit() bool { // Checks to see if the executions and failure thresholds have been exceeded, opening the circuit if so. func (s *closedState[R]) checkThresholdAndReleasePermit(exec failsafe.Execution[R]) { // Execution threshold can only be set for time based thresholding - if s.stats.getExecutionCount() >= s.breaker.config.failureExecutionThreshold { + if s.executionCount() >= s.breaker.failureExecutionThreshold { // Failure rate threshold can only be set for time based thresholding - failureRateThreshold := s.breaker.config.failureRateThreshold - if (failureRateThreshold != 0 && s.stats.getFailureRate() >= failureRateThreshold) || - (failureRateThreshold == 0 && s.stats.getFailureCount() >= s.breaker.config.failureThreshold) { + failureRateThreshold := s.breaker.failureRateThreshold + if (failureRateThreshold != 0 && s.failureRate() >= failureRateThreshold) || + (failureRateThreshold == 0 && s.failureCount() >= s.breaker.failureThreshold) { s.breaker.open(exec) } } } type openState[R any] struct { - breaker *circuitBreaker[R] - stats circuitStats + breaker *circuitBreaker[R] + stats startTime int64 delay time.Duration } @@ -72,27 +69,23 @@ type openState[R any] struct { func newOpenState[R any](breaker *circuitBreaker[R], previousState circuitState[R], delay time.Duration) *openState[R] { return &openState[R]{ breaker: breaker, - stats: previousState.getStats(), - startTime: breaker.config.clock.CurrentUnixNano(), + stats: previousState, + startTime: breaker.clock.CurrentUnixNano(), delay: delay, } } -func (s *openState[R]) getState() State { +func (s *openState[R]) state() State { return OpenState } -func (s *openState[R]) getStats() circuitStats { - return s.stats -} - -func (s *openState[R]) getRemainingDelay() time.Duration { - elapsedTime := s.breaker.config.clock.CurrentUnixNano() - s.startTime +func (s *openState[R]) remainingDelay() time.Duration { + elapsedTime := s.breaker.clock.CurrentUnixNano() - s.startTime return max(0, s.delay-time.Duration(elapsedTime)) } func (s *openState[R]) tryAcquirePermit() bool { - if s.breaker.config.clock.CurrentUnixNano()-s.startTime >= s.delay.Nanoseconds() { + if s.breaker.clock.CurrentUnixNano()-s.startTime >= s.delay.Nanoseconds() { s.breaker.halfOpen() return s.breaker.tryAcquirePermit() } @@ -103,18 +96,18 @@ func (s *openState[R]) checkThresholdAndReleasePermit(_ failsafe.Execution[R]) { } type halfOpenState[R any] struct { - breaker *circuitBreaker[R] - stats circuitStats + breaker *circuitBreaker[R] + stats permittedExecutions uint } func newHalfOpenState[R any](breaker *circuitBreaker[R]) *halfOpenState[R] { - capacity := breaker.config.successThresholdingCapacity + capacity := breaker.successThresholdingCapacity if capacity == 0 { - capacity = breaker.config.failureExecutionThreshold + capacity = breaker.failureExecutionThreshold } if capacity == 0 { - capacity = breaker.config.failureThresholdingCapacity + capacity = breaker.failureThresholdingCapacity } return &halfOpenState[R]{ breaker: breaker, @@ -123,15 +116,11 @@ func newHalfOpenState[R any](breaker *circuitBreaker[R]) *halfOpenState[R] { } } -func (s *halfOpenState[R]) getState() State { +func (s *halfOpenState[R]) state() State { return HalfOpenState } -func (s *halfOpenState[R]) getStats() circuitStats { - return s.stats -} - -func (s *halfOpenState[R]) getRemainingDelay() time.Duration { +func (s *halfOpenState[R]) remainingDelay() time.Duration { return 0 } @@ -154,24 +143,24 @@ func (s *halfOpenState[R]) checkThresholdAndReleasePermit(exec failsafe.Executio var successesExceeded bool var failuresExceeded bool - successThreshold := s.breaker.config.successThreshold + successThreshold := s.breaker.successThreshold if successThreshold != 0 { - successThresholdingCapacity := s.breaker.config.successThresholdingCapacity - successesExceeded = s.stats.getSuccessCount() >= successThreshold - failuresExceeded = s.stats.getFailureCount() > successThresholdingCapacity-successThreshold + successThresholdingCapacity := s.breaker.successThresholdingCapacity + successesExceeded = s.successCount() >= successThreshold + failuresExceeded = s.failureCount() > successThresholdingCapacity-successThreshold } else { // Failure rate threshold can only be set for time based thresholding - failureRateThreshold := s.breaker.config.failureRateThreshold + failureRateThreshold := s.breaker.failureRateThreshold if failureRateThreshold != 0 { // Execution threshold can only be set for time based thresholding - executionThresholdExceeded := s.stats.getExecutionCount() >= s.breaker.config.failureExecutionThreshold - failuresExceeded = executionThresholdExceeded && s.stats.getFailureRate() >= failureRateThreshold - successesExceeded = executionThresholdExceeded && s.stats.getSuccessRate() > 100-failureRateThreshold + executionThresholdExceeded := s.executionCount() >= s.breaker.failureExecutionThreshold + failuresExceeded = executionThresholdExceeded && s.failureRate() >= failureRateThreshold + successesExceeded = executionThresholdExceeded && s.successRate() > 100-failureRateThreshold } else { - failureThresholdingCapacity := s.breaker.config.failureThresholdingCapacity - failureThreshold := s.breaker.config.failureThreshold - failuresExceeded = s.stats.getFailureCount() >= failureThreshold - successesExceeded = s.stats.getSuccessCount() > failureThresholdingCapacity-failureThreshold + failureThresholdingCapacity := s.breaker.failureThresholdingCapacity + failureThreshold := s.breaker.failureThreshold + failuresExceeded = s.failureCount() >= failureThreshold + successesExceeded = s.successCount() > failureThresholdingCapacity-failureThreshold } } diff --git a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstats.go b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstats.go index 5eff59bfe4b..2aee51b2eb2 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstats.go +++ b/vendor/github.com/failsafe-go/failsafe-go/circuitbreaker/circuitstats.go @@ -9,13 +9,14 @@ import ( "github.com/failsafe-go/failsafe-go/internal/util" ) -// stats for a CircuitBreaker. -type circuitStats interface { - getExecutionCount() uint - getFailureCount() uint - getFailureRate() uint - getSuccessCount() uint - getSuccessRate() uint +// Stats for a CircuitBreaker. +// Implementations are not concurrency safe and must be guarded externally. +type stats interface { + executionCount() uint + failureCount() uint + failureRate() uint + successCount() uint + successRate() uint recordFailure() recordSuccess() reset() @@ -24,8 +25,8 @@ type circuitStats interface { // The default number of buckets to aggregate time-based stats into. const defaultBucketCount = 10 -// A circuitStats implementation that counts execution results using a BitSet. -type countingCircuitStats struct { +// A stats implementation that counts execution results using a BitSet. +type countingStats struct { bitSet *bitset.BitSet size uint @@ -36,15 +37,15 @@ type countingCircuitStats struct { failures uint } -func newStats[R any](config *circuitBreakerConfig[R], supportsTimeBased bool, capacity uint) circuitStats { +func newStats[R any](config *config[R], supportsTimeBased bool, capacity uint) stats { if supportsTimeBased && config.failureThresholdingPeriod != 0 { - return newTimedCircuitStats(defaultBucketCount, config.failureThresholdingPeriod, config.clock) + return newTimedStats(defaultBucketCount, config.failureThresholdingPeriod, config.clock) } - return newCountingCircuitStats(capacity) + return newCountingStats(capacity) } -func newCountingCircuitStats(size uint) *countingCircuitStats { - return &countingCircuitStats{ +func newCountingStats(size uint) *countingStats { + return &countingStats{ bitSet: bitset.New(size), size: size, } @@ -55,7 +56,7 @@ Sets the value of the next bit in the bitset, returning the previous value, else value is true if positive/success, false if negative/failure */ -func (c *countingCircuitStats) setNext(value bool) int { +func (c *countingStats) setNext(value bool) int { previousValue := -1 if c.occupiedBits < c.size { c.occupiedBits++ @@ -89,48 +90,48 @@ func (c *countingCircuitStats) setNext(value bool) int { return previousValue } -func (c *countingCircuitStats) indexAfter(index uint) uint { +func (c *countingStats) indexAfter(index uint) uint { if index == c.size-1 { return 0 } return index + 1 } -func (c *countingCircuitStats) getExecutionCount() uint { +func (c *countingStats) executionCount() uint { return c.occupiedBits } -func (c *countingCircuitStats) getFailureCount() uint { +func (c *countingStats) failureCount() uint { return c.failures } -func (c *countingCircuitStats) getFailureRate() uint { +func (c *countingStats) failureRate() uint { if c.occupiedBits == 0 { return 0 } return uint(math.Round(float64(c.failures) / float64(c.occupiedBits) * 100.0)) } -func (c *countingCircuitStats) getSuccessCount() uint { +func (c *countingStats) successCount() uint { return c.successes } -func (c *countingCircuitStats) getSuccessRate() uint { +func (c *countingStats) successRate() uint { if c.occupiedBits == 0 { return 0 } return uint(math.Round(float64(c.successes) / float64(c.occupiedBits) * 100.0)) } -func (c *countingCircuitStats) recordFailure() { +func (c *countingStats) recordFailure() { c.setNext(false) } -func (c *countingCircuitStats) recordSuccess() { +func (c *countingStats) recordSuccess() { c.setNext(true) } -func (c *countingCircuitStats) reset() { +func (c *countingStats) reset() { c.bitSet.ClearAll() c.currentIndex = 0 c.occupiedBits = 0 @@ -138,21 +139,16 @@ func (c *countingCircuitStats) reset() { c.failures = 0 } -// A circuitStats implementation that counts execution results within a time period, and buckets results to minimize overhead. -type timedCircuitStats struct { +// A stats implementation that counts execution results within a time period, and buckets results to minimize overhead. +type timedStats struct { clock util.Clock - bucketSize time.Duration - windowSize time.Duration + bucketSize int64 // Mutable state - buckets []*bucket - summary stat - currentIndex int -} - -type bucket struct { - *stat - startTime int64 + buckets []stat + summary stat + currentIndex int + currentBucketStartTime int64 } type stat struct { @@ -165,114 +161,94 @@ func (s *stat) reset() { s.failures = 0 } -func (s *stat) add(bucket *bucket) { - s.successes += bucket.successes - s.failures += bucket.failures -} - -func (s *stat) remove(bucket *bucket) { +func (s *stat) remove(bucket *stat) { s.successes -= bucket.successes s.failures -= bucket.failures } -func newTimedCircuitStats(bucketCount int, thresholdingPeriod time.Duration, clock util.Clock) *timedCircuitStats { - buckets := make([]*bucket, bucketCount) +func newTimedStats(bucketCount int, thresholdingPeriod time.Duration, clock util.Clock) *timedStats { + buckets := make([]stat, bucketCount) for i := 0; i < bucketCount; i++ { - buckets[i] = &bucket{ - stat: &stat{}, - startTime: -1, - } + buckets[i] = stat{} } - buckets[0].startTime = clock.CurrentUnixNano() - result := &timedCircuitStats{ - buckets: buckets, - windowSize: thresholdingPeriod, - bucketSize: thresholdingPeriod / time.Duration(bucketCount), - clock: clock, - summary: stat{}, + bucketSize := (thresholdingPeriod / time.Duration(bucketCount)).Nanoseconds() + result := &timedStats{ + buckets: buckets, + bucketSize: bucketSize, + clock: clock, + summary: stat{}, + currentBucketStartTime: util.RoundDown(clock.CurrentUnixNano(), bucketSize), } return result } -func (s *timedCircuitStats) getCurrentBucket() *bucket { - previousBucket := s.buckets[s.currentIndex] - currentBucket := previousBucket - timeDiff := s.clock.CurrentUnixNano() - currentBucket.startTime - if timeDiff >= s.bucketSize.Nanoseconds() { - bucketsToMove := int(timeDiff / s.bucketSize.Nanoseconds()) - if bucketsToMove <= len(s.buckets) { - // Reset some buckets - for ; bucketsToMove > 0; bucketsToMove-- { - s.currentIndex = s.nextIndex() - previousBucket = currentBucket - currentBucket = s.buckets[s.currentIndex] - var bucketStartTime int64 - if currentBucket.startTime == -1 { - bucketStartTime = previousBucket.startTime + s.bucketSize.Nanoseconds() - } else { - bucketStartTime = currentBucket.startTime + s.windowSize.Nanoseconds() - } - s.summary.remove(currentBucket) - currentBucket.reset() - currentBucket.startTime = bucketStartTime - } - } else { - // Reset all buckets - s.reset() +func (s *timedStats) getCurrentBucket() *stat { + currentBucket := &s.buckets[s.currentIndex] + now := s.clock.CurrentUnixNano() + timeDiff := now - s.currentBucketStartTime + bucketsToMove := int(timeDiff / s.bucketSize) + + if bucketsToMove > len(s.buckets) { + // Reset all buckets + s.reset() + } else { + // Reset some buckets + for i := 0; i < bucketsToMove; i++ { + s.currentIndex = (s.currentIndex + 1) % len(s.buckets) + currentBucket = &s.buckets[s.currentIndex] + s.summary.remove(currentBucket) + currentBucket.reset() } } + if bucketsToMove > 0 { + s.currentBucketStartTime = util.RoundDown(now, s.bucketSize) + } return currentBucket } -func (s *timedCircuitStats) nextIndex() int { - return (s.currentIndex + 1) % len(s.buckets) -} - -func (s *timedCircuitStats) getExecutionCount() uint { +func (s *timedStats) executionCount() uint { return s.summary.successes + s.summary.failures } -func (s *timedCircuitStats) getFailureCount() uint { +func (s *timedStats) failureCount() uint { return s.summary.failures } -func (s *timedCircuitStats) getFailureRate() uint { - executions := s.getExecutionCount() +func (s *timedStats) failureRate() uint { + executions := s.executionCount() if executions == 0 { return 0 } return uint(math.Round(float64(s.summary.failures) / float64(executions) * 100.0)) } -func (s *timedCircuitStats) getSuccessCount() uint { +func (s *timedStats) successCount() uint { return s.summary.successes } -func (s *timedCircuitStats) getSuccessRate() uint { - executions := s.getExecutionCount() +func (s *timedStats) successRate() uint { + executions := s.executionCount() if executions == 0 { return 0 } return uint(math.Round(float64(s.summary.successes) / float64(executions) * 100.0)) } -func (s *timedCircuitStats) recordFailure() { +func (s *timedStats) recordFailure() { s.getCurrentBucket().failures++ s.summary.failures++ } -func (s *timedCircuitStats) recordSuccess() { +func (s *timedStats) recordSuccess() { s.getCurrentBucket().successes++ s.summary.successes++ } -func (s *timedCircuitStats) reset() { - startTime := s.clock.CurrentUnixNano() - for _, bucket := range s.buckets { +func (s *timedStats) reset() { + for i := range s.buckets { + bucket := &s.buckets[i] bucket.reset() - bucket.startTime = startTime - startTime += s.bucketSize.Nanoseconds() } s.summary.reset() s.currentIndex = 0 diff --git a/vendor/github.com/failsafe-go/failsafe-go/doc.go b/vendor/github.com/failsafe-go/failsafe-go/doc.go index eb621274bcd..c727f58677b 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/doc.go +++ b/vendor/github.com/failsafe-go/failsafe-go/doc.go @@ -1,4 +1,4 @@ -// Package failsafe provides the entrypoint for using Failsafe-go. +// Package failsafe provides fault tolerance and resilience patterns. // // Failsafe-go adds fault tolerance to function execution. Functions can be wrapped with one or more resilience policies, for example: // diff --git a/vendor/github.com/failsafe-go/failsafe-go/events.go b/vendor/github.com/failsafe-go/failsafe-go/events.go index 42ae5b558e8..37046294286 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/events.go +++ b/vendor/github.com/failsafe-go/failsafe-go/events.go @@ -20,17 +20,17 @@ type ExecutionScheduledEvent[R any] struct { // ExecutionDoneEvent indicates an execution is done. type ExecutionDoneEvent[R any] struct { - ExecutionStats + ExecutionInfo // The execution result, else the zero value for R Result R // The execution error, else nil Error error } -func newExecutionDoneEvent[R any](stats ExecutionStats, er *common.PolicyResult[R]) ExecutionDoneEvent[R] { +func newExecutionDoneEvent[R any](info ExecutionInfo, er *common.PolicyResult[R]) ExecutionDoneEvent[R] { return ExecutionDoneEvent[R]{ - ExecutionStats: stats, - Result: er.Result, - Error: er.Error, + ExecutionInfo: info, + Result: er.Result, + Error: er.Error, } } diff --git a/vendor/github.com/failsafe-go/failsafe-go/execution.go b/vendor/github.com/failsafe-go/failsafe-go/execution.go index 96d9b62bcb4..6d673c5e6b1 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/execution.go +++ b/vendor/github.com/failsafe-go/failsafe-go/execution.go @@ -9,8 +9,12 @@ import ( "github.com/failsafe-go/failsafe-go/common" ) -// ExecutionStats contains execution stats. -type ExecutionStats interface { +// ExecutionInfo contains execution info. +type ExecutionInfo interface { + // Context returns the context configured for the execution, else context.Background if none was configured. For + // executions involving a timeout or hedge, each attempt will get a separate child context. + Context() context.Context + // Attempts returns the number of execution attempts so far, including attempts that are currently in progress and // attempts that were blocked before being executed, such as by a CircuitBreaker or RateLimiter. These can include an initial // execution along with retries and hedges. @@ -35,7 +39,7 @@ type ExecutionStats interface { // ExecutionAttempt contains information for an execution attempt. type ExecutionAttempt[R any] interface { - ExecutionStats + ExecutionInfo // LastResult returns the result, if any, from the last execution attempt. LastResult() R @@ -63,10 +67,6 @@ type ExecutionAttempt[R any] interface { type Execution[R any] interface { ExecutionAttempt[R] - // Context returns the context configured for the execution, else context.Background if none was configured. For - // executions involving a timeout or hedge, each attempt will get a separate child context. - Context() context.Context - // IsCanceled returns whether the execution has been canceled by an external Context or a timeout.Timeout. IsCanceled() bool @@ -106,7 +106,7 @@ type execution[R any] struct { } var _ Execution[any] = &execution[any]{} -var _ ExecutionStats = &execution[any]{} +var _ ExecutionInfo = &execution[any]{} func (e *execution[R]) Attempts() int { return int(e.attempts.Load()) @@ -205,11 +205,11 @@ func (e *execution[R]) InitializeRetry() *common.PolicyResult[R] { return nil } -func (e *execution[R]) Cancel(result *common.PolicyResult[R]) *common.PolicyResult[R] { +func (e *execution[R]) Cancel(result *common.PolicyResult[R]) { e.mtx.Lock() defer e.mtx.Unlock() - if canceled, cancelResult := e.isCanceledWithResult(); canceled { - return cancelResult + if canceled, _ := e.isCanceledWithResult(); canceled { + return } *e.canceledResult = result @@ -220,7 +220,6 @@ func (e *execution[R]) Cancel(result *common.PolicyResult[R]) *common.PolicyResu if e.cancelFunc != nil { e.cancelFunc() } - return result } func (e *execution[R]) IsCanceledWithResult() (bool, *common.PolicyResult[R]) { @@ -263,6 +262,7 @@ func (e *execution[R]) CopyForHedge() Execution[R] { c.isHedge = true c.attempts.Add(1) c.hedges.Add(1) + c.ctx, c.cancelFunc = context.WithCancel(c.ctx) return c } diff --git a/vendor/github.com/failsafe-go/failsafe-go/executor.go b/vendor/github.com/failsafe-go/failsafe-go/executor.go index 3a108657599..98294dcd1d4 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/executor.go +++ b/vendor/github.com/failsafe-go/failsafe-go/executor.go @@ -167,14 +167,14 @@ func (e *executor[R]) OnFailure(listener func(ExecutionDoneEvent[R])) Executor[R func (e *executor[R]) Run(fn func() error) error { _, err := e.executeSync(func(_ Execution[R]) (R, error) { - return *(new(R)), fn() + return *new(R), fn() }, false) return err } func (e *executor[R]) RunWithExecution(fn func(exec Execution[R]) error) error { _, err := e.executeSync(func(exec Execution[R]) (R, error) { - return *(new(R)), fn(exec) + return *new(R), fn(exec) }, true) return err } @@ -193,13 +193,13 @@ func (e *executor[R]) GetWithExecution(fn func(exec Execution[R]) (R, error)) (R func (e *executor[R]) RunAsync(fn func() error) ExecutionResult[R] { return e.executeAsync(func(_ Execution[R]) (R, error) { - return *(new(R)), fn() + return *new(R), fn() }, false) } func (e *executor[R]) RunWithExecutionAsync(fn func(exec Execution[R]) error) ExecutionResult[R] { return e.executeAsync(func(exec Execution[R]) (R, error) { - return *(new(R)), fn(exec) + return *new(R), fn(exec) }, true) } @@ -264,7 +264,7 @@ func (e *executor[R]) execute(fn func(exec Execution[R]) (R, error), outerExec * // Compose policy executors from the innermost policy to the outermost for i := len(e.policies) - 1; i >= 0; i-- { - pe := e.policies[i].ToExecutor(*(new(R))).(policyExecutor[R]) + pe := e.policies[i].ToExecutor(*new(R)).(policyExecutor[R]) outerFn = pe.Apply(outerFn) } diff --git a/vendor/github.com/failsafe-go/failsafe-go/internal/util/util.go b/vendor/github.com/failsafe-go/failsafe-go/internal/util/util.go index 8657fb66425..a16ce1d27af 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/internal/util/util.go +++ b/vendor/github.com/failsafe-go/failsafe-go/internal/util/util.go @@ -1,6 +1,8 @@ package util import ( + "context" + "reflect" "time" ) @@ -8,6 +10,83 @@ type number interface { ~int | ~int64 | ~uint | ~uint64 } +func noop(_ error) {} + +var errorType = reflect.TypeOf((*error)(nil)).Elem() + +// ErrorTypesMatch indicates whether the err or any unwrapped causes of the err are assignable to the target type. This is +// similar to the test that errors.As performs, but does not actually assign a value and allows a non-pointer target. +// This method also allows a non-pointer target for an error that's implemented with pointer receivers. +// Panics if target is nil or not an error. +func ErrorTypesMatch(err error, target any) bool { + if err == nil { + return false + } + if target == nil { + panic("target cannot be nil") + } + targetType := reflect.TypeOf(target) + if targetType.Kind() == reflect.Ptr { + targetType = targetType.Elem() + } + if targetType.Kind() != reflect.Interface && !targetType.Implements(errorType) { + // If targetType is not an error, convert it to a pointer and check again + targetType = reflect.PointerTo(targetType) + if !targetType.Implements(errorType) { + panic("target must be interface or implement error") + } + } + return errorAs(err, targetType) +} + +func errorAs(err error, targetType reflect.Type) bool { + for { + if reflect.TypeOf(err).AssignableTo(targetType) { + return true + } + switch x := err.(type) { + case interface{ Unwrap() error }: + err = x.Unwrap() + if err == nil { + return false + } + case interface{ Unwrap() []error }: + for _, err := range x.Unwrap() { + if err == nil { + continue + } + if errorAs(err, targetType) { + return true + } + } + return false + default: + return false + } + } +} + +// MergeContexts returns a context that is canceled when either ctx1 or ctx2 are Done. +func MergeContexts(ctx1, ctx2 context.Context) (context.Context, context.CancelCauseFunc) { + bgContext := context.Background() + if ctx1 == bgContext { + return ctx2, noop + } + if ctx2 == bgContext { + return ctx1, noop + } + ctx, cancel := context.WithCancelCause(context.Background()) + go func() { + select { + case <-ctx1.Done(): + cancel(ctx1.Err()) + case <-ctx2.Done(): + cancel(ctx2.Err()) + } + }() + return ctx, cancel +} + // AppliesToAny returns true if any of the biPredicates evaluate to true for the values. func AppliesToAny[A any, B any](biPredicates []func(A, B) bool, value1 A, value2 B) bool { for _, p := range biPredicates { @@ -19,8 +98,8 @@ func AppliesToAny[A any, B any](biPredicates []func(A, B) bool, value1 A, value2 } // RoundDown returns the input rounded down to the nearest interval. -func RoundDown(input time.Duration, interval time.Duration) time.Duration { - return (input / interval) * interval +func RoundDown[T number](input T, interval T) T { + return input - input%interval } func RandomDelayInRange[T number](delayMin T, delayMax T, random float64) T { diff --git a/vendor/github.com/failsafe-go/failsafe-go/policy.go b/vendor/github.com/failsafe-go/failsafe-go/policy.go index 4671701ee14..f5a40eee767 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/policy.go +++ b/vendor/github.com/failsafe-go/failsafe-go/policy.go @@ -21,9 +21,14 @@ FailurePolicyBuilder builds a Policy that allows configurable conditions to dete policy handling. */ type FailurePolicyBuilder[S any, R any] interface { - // HandleErrors specifies the errors to handle as failures. Any errors that evaluate to true for errors.Is and the + // HandleErrors specifies the errors to handle as failures. Any errs that evaluate to true for errors.Is and the // execution error will be handled. - HandleErrors(errors ...error) S + HandleErrors(errs ...error) S + + // HandleErrorTypes specifies the errors whose types should be handled as failures. Any execution errors or their + // Unwrapped parents whose type matches any of the errs' types will be handled. This is similar to the check that + // errors.As performs. + HandleErrorTypes(errs ...any) S // HandleResult specifies the results to handle as failures. Any result that evaluates to true for reflect.DeepEqual and // the execution result will be handled. This method is only considered when a result is returned from an execution, not @@ -49,6 +54,6 @@ type DelayablePolicyBuilder[S any, R any] interface { // WithDelay configures the time to delay between execution attempts. WithDelay(delay time.Duration) S - // WithDelayFunc accepts a function that configures the time to delay before the next execution attempt. + // WithDelayFunc configures a function that returns the time to delay before the next execution attempt. WithDelayFunc(delayFunc DelayFunc[R]) S } diff --git a/vendor/github.com/failsafe-go/failsafe-go/policy/execution.go b/vendor/github.com/failsafe-go/failsafe-go/policy/execution.go index 7b8b169eac8..5ff94c3a0b6 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/policy/execution.go +++ b/vendor/github.com/failsafe-go/failsafe-go/policy/execution.go @@ -17,7 +17,7 @@ type ExecutionInternal[R any] interface { InitializeRetry() *common.PolicyResult[R] // Cancel cancels the execution with the result. - Cancel(result *common.PolicyResult[R]) *common.PolicyResult[R] + Cancel(result *common.PolicyResult[R]) // IsCanceledWithResult returns whether the execution is canceled, along with the cancellation result, if any. IsCanceledWithResult() (bool, *common.PolicyResult[R]) diff --git a/vendor/github.com/failsafe-go/failsafe-go/policy/policy.go b/vendor/github.com/failsafe-go/failsafe-go/policy/policy.go index 58a769011ec..da56c99def5 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/policy/policy.go +++ b/vendor/github.com/failsafe-go/failsafe-go/policy/policy.go @@ -29,6 +29,16 @@ func (p *BaseFailurePolicy[R]) HandleErrors(errs ...error) { p.errorsChecked = true } +func (p *BaseFailurePolicy[R]) HandleErrorTypes(errs ...any) { + for _, target := range errs { + t := target + p.failureConditions = append(p.failureConditions, func(r R, actualErr error) bool { + return util.ErrorTypesMatch(actualErr, t) + }) + } + p.errorsChecked = true +} + func (p *BaseFailurePolicy[R]) HandleResult(result R) { p.failureConditions = append(p.failureConditions, func(r R, err error) bool { return reflect.DeepEqual(r, result) @@ -103,6 +113,15 @@ func (c *BaseAbortablePolicy[R]) AbortOnErrors(errs ...error) { } } +func (c *BaseAbortablePolicy[R]) AbortOnErrorTypes(errs ...any) { + for _, target := range errs { + t := target + c.abortConditions = append(c.abortConditions, func(result R, actualErr error) bool { + return util.ErrorTypesMatch(actualErr, t) + }) + } +} + func (c *BaseAbortablePolicy[R]) AbortIf(predicate func(R, error) bool) { c.abortConditions = append(c.abortConditions, func(result R, err error) bool { return predicate(result, err) diff --git a/vendor/github.com/failsafe-go/failsafe-go/result.go b/vendor/github.com/failsafe-go/failsafe-go/result.go index c5fbcd23033..f994d95f1d5 100644 --- a/vendor/github.com/failsafe-go/failsafe-go/result.go +++ b/vendor/github.com/failsafe-go/failsafe-go/result.go @@ -60,7 +60,7 @@ func (e *executionResult[R]) Get() (R, error) { if result != nil { return (*result).Result, (*result).Error } - return *(new(R)), nil + return *new(R), nil } func (e *executionResult[R]) Result() R { diff --git a/vendor/modules.txt b/vendor/modules.txt index 35ccfd1208a..8ed9dce0258 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -342,7 +342,7 @@ github.com/efficientgo/core/testutil/internal # github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb ## explicit github.com/facette/natsort -# github.com/failsafe-go/failsafe-go v0.6.2 +# github.com/failsafe-go/failsafe-go v0.6.8 ## explicit; go 1.21 github.com/failsafe-go/failsafe-go github.com/failsafe-go/failsafe-go/circuitbreaker