Skip to content

Commit

Permalink
Rename
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiyukiMineo committed Nov 16, 2024
1 parent c1a3eca commit c301dea
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 117 deletions.
95 changes: 47 additions & 48 deletions v2/distributed_gobreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

// SharedState represents the CircuitBreaker state stored in Distributed Storage
// SharedState represents the shared state of DistributedCircuitBreaker.
type SharedState struct {
State State `json:"state"`
Generation uint64 `json:"generation"`
Expand All @@ -26,32 +26,32 @@ type DistributedCircuitBreaker[T any] struct {
}

// NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker configured with the given StorageSettings
func NewDistributedCircuitBreaker[T any](storageClient SharedStateStore, settings Settings) *DistributedCircuitBreaker[T] {
func NewDistributedCircuitBreaker[T any](store SharedStateStore, settings Settings) *DistributedCircuitBreaker[T] {
cb := NewCircuitBreaker[T](settings)
return &DistributedCircuitBreaker[T]{
CircuitBreaker: cb,
store: storageClient,
store: store,
}
}

func (rcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State {
if rcb.store == nil {
return rcb.CircuitBreaker.State()
func (dcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State {
if dcb.store == nil {
return dcb.CircuitBreaker.State()
}

state, err := rcb.store.GetState(ctx)
state, err := dcb.store.GetState(ctx)
if err != nil {
// Fallback to in-memory state if Storage fails
return rcb.CircuitBreaker.State()
return dcb.CircuitBreaker.State()
}

now := time.Now()
currentState, _ := rcb.currentState(state, now)
currentState, _ := dcb.currentState(state, now)

// Update the state in Storage if it has changed
if currentState != state.State {
state.State = currentState
if err := rcb.store.SetState(ctx, state); err != nil {
if err := dcb.store.SetState(ctx, state); err != nil {
// Log the error, but continue with the current state
fmt.Printf("Failed to update state in storage: %v\n", err)
}
Expand All @@ -61,11 +61,11 @@ func (rcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State {
}

// Execute runs the given request if the DistributedCircuitBreaker accepts it
func (rcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) {
if rcb.store == nil {
return rcb.CircuitBreaker.Execute(req)
func (dcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) {
if dcb.store == nil {
return dcb.CircuitBreaker.Execute(req)
}
generation, err := rcb.beforeRequest(ctx)
generation, err := dcb.beforeRequest(ctx)
if err != nil {
var zero T
return zero, err
Expand All @@ -74,69 +74,69 @@ func (rcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func()
defer func() {
e := recover()
if e != nil {
rcb.afterRequest(ctx, generation, false)
dcb.afterRequest(ctx, generation, false)
panic(e)
}
}()

result, err := req()
rcb.afterRequest(ctx, generation, rcb.isSuccessful(err))
dcb.afterRequest(ctx, generation, dcb.isSuccessful(err))

return result, err
}

func (rcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) {
state, err := rcb.store.GetState(ctx)
func (dcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) {
state, err := dcb.store.GetState(ctx)
if err != nil {
return 0, err
}
now := time.Now()
currentState, generation := rcb.currentState(state, now)
currentState, generation := dcb.currentState(state, now)

if currentState != state.State {
rcb.setState(&state, currentState, now)
err = rcb.store.SetState(ctx, state)
dcb.setState(&state, currentState, now)
err = dcb.store.SetState(ctx, state)
if err != nil {
return 0, err
}
}

if currentState == StateOpen {
return generation, ErrOpenState
} else if currentState == StateHalfOpen && state.Counts.Requests >= rcb.maxRequests {
} else if currentState == StateHalfOpen && state.Counts.Requests >= dcb.maxRequests {
return generation, ErrTooManyRequests
}

state.Counts.onRequest()
err = rcb.store.SetState(ctx, state)
err = dcb.store.SetState(ctx, state)
if err != nil {
return 0, err
}

return generation, nil
}

func (rcb *DistributedCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) {
state, err := rcb.store.GetState(ctx)
func (dcb *DistributedCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) {
state, err := dcb.store.GetState(ctx)
if err != nil {
return
}
now := time.Now()
currentState, generation := rcb.currentState(state, now)
currentState, generation := dcb.currentState(state, now)
if generation != before {
return
}

if success {
rcb.onSuccess(&state, currentState, now)
dcb.onSuccess(&state, currentState, now)
} else {
rcb.onFailure(&state, currentState, now)
dcb.onFailure(&state, currentState, now)
}

rcb.store.SetState(ctx, state)
dcb.store.SetState(ctx, state)
}

func (rcb *DistributedCircuitBreaker[T]) onSuccess(state *SharedState, currentState State, now time.Time) {
func (dcb *DistributedCircuitBreaker[T]) onSuccess(state *SharedState, currentState State, now time.Time) {
if state.State == StateOpen {
state.State = currentState
}
Expand All @@ -146,68 +146,67 @@ func (rcb *DistributedCircuitBreaker[T]) onSuccess(state *SharedState, currentSt
state.Counts.onSuccess()
case StateHalfOpen:
state.Counts.onSuccess()
if state.Counts.ConsecutiveSuccesses >= rcb.maxRequests {
rcb.setState(state, StateClosed, now)
if state.Counts.ConsecutiveSuccesses >= dcb.maxRequests {
dcb.setState(state, StateClosed, now)
}
}
}

func (rcb *DistributedCircuitBreaker[T]) onFailure(state *SharedState, currentState State, now time.Time) {
func (dcb *DistributedCircuitBreaker[T]) onFailure(state *SharedState, currentState State, now time.Time) {
switch currentState {
case StateClosed:
state.Counts.onFailure()
if rcb.readyToTrip(state.Counts) {
rcb.setState(state, StateOpen, now)
if dcb.readyToTrip(state.Counts) {
dcb.setState(state, StateOpen, now)
}
case StateHalfOpen:
rcb.setState(state, StateOpen, now)
dcb.setState(state, StateOpen, now)
}
}

func (rcb *DistributedCircuitBreaker[T]) currentState(state SharedState, now time.Time) (State, uint64) {
func (dcb *DistributedCircuitBreaker[T]) currentState(state SharedState, now time.Time) (State, uint64) {
switch state.State {
case StateClosed:
if !state.Expiry.IsZero() && state.Expiry.Before(now) {
rcb.toNewGeneration(&state, now)
dcb.toNewGeneration(&state, now)
}
case StateOpen:
if state.Expiry.Before(now) {
rcb.setState(&state, StateHalfOpen, now)
dcb.setState(&state, StateHalfOpen, now)
}
}
return state.State, state.Generation
}

func (rcb *DistributedCircuitBreaker[T]) setState(state *SharedState, newState State, now time.Time) {
func (dcb *DistributedCircuitBreaker[T]) setState(state *SharedState, newState State, now time.Time) {
if state.State == newState {
return
}

prev := state.State
state.State = newState

rcb.toNewGeneration(state, now)
dcb.toNewGeneration(state, now)

if rcb.onStateChange != nil {
rcb.onStateChange(rcb.name, prev, newState)
if dcb.onStateChange != nil {
dcb.onStateChange(dcb.name, prev, newState)
}
}

func (rcb *DistributedCircuitBreaker[T]) toNewGeneration(state *SharedState, now time.Time) {

func (dcb *DistributedCircuitBreaker[T]) toNewGeneration(state *SharedState, now time.Time) {
state.Generation++
state.Counts.clear()

var zero time.Time
switch state.State {
case StateClosed:
if rcb.interval == 0 {
if dcb.interval == 0 {
state.Expiry = zero
} else {
state.Expiry = now.Add(rcb.interval)
state.Expiry = now.Add(dcb.interval)
}
case StateOpen:
state.Expiry = now.Add(rcb.timeout)
state.Expiry = now.Add(dcb.timeout)
default: // StateHalfOpen
state.Expiry = zero
}
Expand Down
Loading

0 comments on commit c301dea

Please sign in to comment.