-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4 from kinfinity/feat/circuit-breaker
Circuit Breaker Pattern
- Loading branch information
Showing
4 changed files
with
340 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
package circuitbreaker | ||
|
||
// Models Circuit Breaker and Handles lifecycle up to re connection | ||
import ( | ||
"context" | ||
"errors" | ||
"log" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/kinfinity/distributed-resilience/retry" | ||
|
||
) | ||
|
||
// Circuit Breaker | ||
type CircuitBreaker struct { | ||
recoveryTime time.Duration | ||
recovered bool | ||
maxFailures int32 | ||
failCount *atomic.Int32 | ||
successCount *atomic.Int32 // what if a certain number of concurrent successful executions are required? | ||
maxSuccess int32 // ? | ||
currentState breakerState | ||
lastChangeTime int64 | ||
ctx context.Context // Set Life Cycle Expiry Timer ? | ||
completion chan bool | ||
lastError error | ||
fallback func(error) // Default FallBack Functionality - Kicks in when state switches to Open | ||
retryPin *retry.Retry | ||
} | ||
|
||
// BreakerOptions | ||
type BreakerOptions struct { | ||
RecoveryTime time.Duration | ||
MaxFailures int32 | ||
MaxSuccess int32 | ||
ctx context.Context | ||
} | ||
|
||
// State | ||
type breakerState uint8 | ||
|
||
const ( | ||
closed breakerState = iota // Forward Requests | ||
halfOpen // Error detected w no ful fail over yet | ||
open // Requests fail immediately - Not forwarded | ||
) | ||
|
||
// New Breaker Initialized in close state | ||
func NewCircuitBreaker(bo BreakerOptions) *CircuitBreaker { | ||
return &CircuitBreaker{ | ||
recoveryTime: bo.RecoveryTime, | ||
maxFailures: bo.MaxFailures, | ||
maxSuccess: bo.MaxSuccess, | ||
failCount: &atomic.Int32{}, | ||
successCount: &atomic.Int32{}, | ||
currentState: closed, | ||
ctx: bo.ctx, // Need context deadline for lifecycle of circuit breaker? | ||
fallback: func(err error) { | ||
// | ||
}, | ||
completion: make(chan bool, 1), // channel to watch for completion of circuit Execution | ||
recovered: false, // Flag indicating whether the Circuit has been recovered or not | ||
} | ||
} | ||
|
||
// Execute and ensure the function is executed within a circuit breaker w context | ||
func (cb *CircuitBreaker) Do(f func() error) error { | ||
cb.failCount.Store(0) | ||
cb.successCount.Store(0) | ||
|
||
select { | ||
case <-cb.ctx.Done(): | ||
// Lifecycle time is up | ||
return errors.New("circuit breaker expired") | ||
default: | ||
// Continue with the circuit breaker logic | ||
} | ||
|
||
// kick out | ||
if cb.currentState == closed && cb.failCount.Load() > cb.maxFailures { | ||
return errors.New("circuit open") | ||
} | ||
|
||
// completion | ||
if cb.currentState == closed && cb.successCount.Load() >= cb.maxSuccess { | ||
cb.completion <- true | ||
return nil | ||
} | ||
|
||
switch cb.currentState { | ||
case closed: | ||
err := f() | ||
if err != nil { | ||
// Execution of f failed mark as failure | ||
cb.recordFailure() | ||
cb.lastError = err | ||
// check if max retries has been hit & retry | ||
if cb.failCount.Load() == cb.maxFailures { | ||
// Move to half open | ||
cb.switchState() | ||
} | ||
cb.Do(f) // final Objective is to get maxSuccess on successCount ? base scenario 1 | ||
} | ||
// Execution of f succeeded | ||
cb.recordSuccess() | ||
// No need for circuit breaker Lifecyle continuation | ||
// cleanup | ||
return nil | ||
case halfOpen: | ||
err := f() | ||
if err == nil { | ||
// half-open, allowed one call to succeed | ||
cb.recordSuccess() | ||
// if it is back to normal operation with normal consecutive successes then move to close state | ||
if cb.successCount.Load() == cb.maxSuccess { | ||
cb.recovered = true | ||
cb.switchState() | ||
} | ||
} else { | ||
// still in half-open, treat as a failure | ||
cb.recordFailure() | ||
cb.retryPin = retry.NewWithBackOff( | ||
int32(2), | ||
cb.recoveryTime, | ||
[]error{cb.lastError}, | ||
retry.NewBackOff(100*time.Millisecond, true, true), | ||
) | ||
cb.switchState() | ||
} | ||
cb.lastError = err | ||
return err | ||
case open: | ||
// evaluate and decide whether or not to switch back to closed state | ||
cb.fallback(cb.lastError) | ||
default: | ||
return errors.New("unexpected state") | ||
} | ||
|
||
return cb.Do(f) | ||
} | ||
|
||
// Change the state of the circuit breaker based on current configuration | ||
func (cb *CircuitBreaker) switchState() { | ||
if !isState(cb.currentState) { | ||
panic("unexpected state") | ||
} | ||
log.Println("Changing Circuit Breaker state from", cb.currentState) | ||
switch cb.currentState { | ||
case open: | ||
cb.currentState = halfOpen | ||
case halfOpen: | ||
if cb.recovered { | ||
cb.currentState = closed | ||
} else { | ||
cb.currentState = open | ||
} | ||
default: | ||
cb.currentState++ | ||
} | ||
cb.lastChangeTime = time.Now().UnixNano() / int64(time.Millisecond) | ||
log.Println("Changing Circuit Breaker state to", cb.currentState) | ||
} | ||
|
||
func isState(state breakerState) bool { | ||
return state < open | ||
} | ||
|
||
// Record a success - resets the fail count | ||
func (cb *CircuitBreaker) recordSuccess() { | ||
cb.successCount.Add(1) | ||
cb.failCount.Store(0) | ||
} | ||
|
||
// Record a failure - increments the fail count, reset successCount | ||
func (cb *CircuitBreaker) recordFailure() { | ||
cb.failCount.Add(1) | ||
cb.successCount.Store(0) | ||
} | ||
|
||
// Checks whether enough time has passed since the last state change | ||
// func (cb *CircuitBreaker) isStale() bool { | ||
// now := time.Now().UnixNano() | ||
// delta := float64(now-cb.lastChangeTime) / 1e9 | ||
// return delta >= float64(cb.recoveryTime) | ||
// } | ||
|
||
// Duration in Context for Lifecyle Time | ||
// post execute / free resources |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package circuitbreaker | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
// TestCircuitBreakerDo tests the Do method of CircuitBreaker | ||
func TestCircuitBreakerDo(t *testing.T) { | ||
maxFailures := int32(3) | ||
maxSuccess := int32(1) | ||
recoveryTime := 100 * time.Millisecond | ||
|
||
// Mock context with cancelation after a certain duration | ||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) | ||
defer cancel() | ||
|
||
options := BreakerOptions{ | ||
RecoveryTime: recoveryTime, | ||
MaxFailures: maxFailures, | ||
MaxSuccess: maxSuccess, | ||
ctx: ctx, | ||
} | ||
|
||
cb := NewCircuitBreaker(options) | ||
|
||
t.Run("Success", func(t *testing.T) { | ||
err := cb.Do(func() error { | ||
return nil | ||
}) | ||
|
||
// Wait for transition | ||
time.Sleep(2 * time.Second) | ||
assert.NoError(t, err) | ||
}) | ||
|
||
} | ||
|
||
// Test state switching mechanism | ||
func TestCircuitBreakerSwitchState(t *testing.T) { | ||
recoveryTime := 100 * time.Millisecond | ||
|
||
// Mock context with cancellation after a certain duration | ||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) | ||
defer cancel() | ||
|
||
options := BreakerOptions{ | ||
RecoveryTime: recoveryTime, | ||
MaxFailures: 3, | ||
MaxSuccess: 1, | ||
ctx: ctx, | ||
} | ||
|
||
cb := NewCircuitBreaker(options) | ||
|
||
t.Run("SwitchFromClosedToHalfOpen", func(t *testing.T) { | ||
cb.currentState = closed | ||
cb.failCount.Store(3) | ||
cb.switchState() | ||
assert.Equal(t, halfOpen, cb.currentState) | ||
}) | ||
|
||
t.Run("SwitchFromHalfOpenToClosed", func(t *testing.T) { | ||
cb.currentState = halfOpen | ||
cb.successCount.Store(1) | ||
cb.recovered = true | ||
cb.switchState() | ||
assert.Equal(t, closed, cb.currentState) | ||
}) | ||
|
||
t.Run("SwitchFromHalfOpenToOpen", func(t *testing.T) { | ||
cb.currentState = halfOpen | ||
cb.failCount.Store(3) | ||
cb.recovered = false | ||
cb.switchState() | ||
assert.Equal(t, open, cb.currentState) | ||
}) | ||
|
||
t.Run("UnexpectedState", func(t *testing.T) { | ||
cb.currentState = breakerState(100) // Invalid state | ||
assert.Panics(t, func() { cb.switchState() }) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
# Circuit Breaker Pattern | ||
|
||
The Circuit Breaker Pattern is designed to enhance the resilience of a system by monitoring for failures and preventing further requests to a failing service. It consists of three states: **Closed**, **Open**, and **Half-Open**. The circuit breaker transitions between these states based on the observed failures | ||
|
||
- **Closed State:** In this state, the circuit breaker allows requests to pass through. It monitors for failures, and if the failure rate exceeds a predefined threshold, it transitions to the **Open State**. | ||
- **Open State:** In this state, the circuit breaker prevents requests from reaching the service, providing a fast-fail mechanism. After a predefined timeout, the circuit breaker transitions to the **Half-Open State** to test if the service has recovered. | ||
- **Half-Open State:** In this state, the circuit breaker allows a limited number of requests to pass through. If these requests succeed, the circuit breaker transitions back to the **Closed State**; otherwise, it returns to the **Open State**. | ||
|
||
## Execution | ||
|
||
- Starts up in the closed state | ||
- on failure move to half closed | ||
- if it doesn't recovery move to open state | ||
- after recovery duration move back to half closed | ||
- if it recovers move to closed state | ||
|
||
## Components | ||
|
||
- **Circuit Breaker**: Represents the main circuit breaker object, responsible for managing the state transitions and handling requests based on the current state. | ||
- **BreakerOptions**: Defines the options for configuring the circuit breaker, such as recovery time and maximum allowed failures. | ||
- **breakerState**: Enumerates the possible states of the circuit breaker, including closed, half-open, and open. | ||
- **Retry**: Utilizes the retry package for handling retry logic when transitioning from the half-open state back to the closed state. | ||
|
||
## Usage | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"log" | ||
"time" | ||
|
||
"github.com/example/circuitbreaker" | ||
) | ||
|
||
func main() { | ||
options := circuitbreaker.BreakerOptions{ | ||
RecoveryTime: time.Minute, | ||
MaxFailures: 5, | ||
// ctx: Provide context for lifecycle time (optional) | ||
} | ||
|
||
cb := circuitbreaker.NewCircuitBreaker(options) | ||
|
||
err := cb.Do(func() error { | ||
// Simulate an operation that may fail | ||
log.Println("Executing function...") | ||
return errors.New("temporary error") | ||
}) | ||
|
||
if err != nil { | ||
log.Println("Operation failed:", err) | ||
} else { | ||
log.Println("Operation succeeded.") | ||
} | ||
} | ||
``` | ||
|
||
# **References:** | ||
|
||
- **Michael Nygard** https://pragprog.com/titles/mnee2/release-it-second-edition/ Second Edition - Stability Patterns | ||
- https://learn.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker |