Skip to content

Commit

Permalink
minor: refactor fadeIn code
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
Roman Zavodskikh committed Jan 29, 2024
1 parent f4db475 commit e0369d0
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 34 deletions.
28 changes: 14 additions & 14 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func TestApply(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
req, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand All @@ -267,14 +268,13 @@ func TestApply(t *testing.T) {
},
}
rt := p.Do([]*routing.Route{r})
endpointRegistry.Do([]*routing.Route{r})

lbctx := &routing.LBContext{
Request: req,
Route: rt[0],
LBEndpoints: rt[0].LBEndpoints,
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
lbctx.Registry.Do([]*routing.Route{r})

h := make(map[string]int)
for i := 0; i < R; i++ {
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestConsistentHashSearch(t *testing.T) {
endpointRegistry.Do([]*routing.Route{r})

ch := newConsistentHash(endpoints).(*consistentHash)
ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}, Registry: endpointRegistry}
ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}}
return endpoints[ch.search(key, ctx)]
}

Expand Down Expand Up @@ -347,31 +347,31 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
Route: route,
LBEndpoints: route.LBEndpoints,
Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
ctx.Registry.Do([]*routing.Route{route})
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
endpointRegistry.Do([]*routing.Route{route})
noLoad := ch.Apply(ctx)
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})

if noLoad != nonBounded {
t.Error("When no endpoints are overloaded, the chosen endpoint should be the same as standard consistentHash")
}
// now we know that noLoad is the endpoint which should be requested for somekey if load is not an issue.
addInflightRequests(ctx.Registry, noLoad, 20)
addInflightRequests(endpointRegistry, noLoad, 20)
failover1 := ch.Apply(ctx)
if failover1 == nonBounded {
t.Error("When the selected endpoint is overloaded, the chosen endpoint should be different to standard consistentHash")
}

// now if 2 endpoints are overloaded, the request should go to the final endpoint
addInflightRequests(ctx.Registry, failover1, 20)
addInflightRequests(endpointRegistry, failover1, 20)
failover2 := ch.Apply(ctx)
if failover2 == nonBounded || failover2 == failover1 {
t.Error("Only the final endpoint had load below the average * balanceFactor, so it should have been selected.")
}

// now all will have same load, should select the original endpoint again
addInflightRequests(ctx.Registry, failover2, 20)
addInflightRequests(endpointRegistry, failover2, 20)
allLoaded := ch.Apply(ctx)
if allLoaded != nonBounded {
t.Error("When all endpoints have the same load, the consistentHash endpoint should be chosen again.")
Expand Down Expand Up @@ -427,26 +427,26 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
Route: route,
LBEndpoints: route.LBEndpoints,
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
ctx.Registry.Do([]*routing.Route{route})
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
endpointRegistry.Do([]*routing.Route{route})

for i := 0; i < 100; i++ {
ep := ch.Apply(ctx)
ifr0 := route.LBEndpoints[0].Metrics.InflightRequests()
ifr1 := route.LBEndpoints[1].Metrics.InflightRequests()
ifr2 := route.LBEndpoints[2].Metrics.InflightRequests()

assert.Equal(t, int64(ifr0), ctx.Registry.GetMetrics(route.LBEndpoints[0].Host).InflightRequests())
assert.Equal(t, int64(ifr1), ctx.Registry.GetMetrics(route.LBEndpoints[1].Host).InflightRequests())
assert.Equal(t, int64(ifr2), ctx.Registry.GetMetrics(route.LBEndpoints[2].Host).InflightRequests())
assert.Equal(t, int64(ifr0), endpointRegistry.GetMetrics(route.LBEndpoints[0].Host).InflightRequests())
assert.Equal(t, int64(ifr1), endpointRegistry.GetMetrics(route.LBEndpoints[1].Host).InflightRequests())
assert.Equal(t, int64(ifr2), endpointRegistry.GetMetrics(route.LBEndpoints[2].Host).InflightRequests())

avg := float64(ifr0+ifr1+ifr2) / 3.0
limit := int64(avg*balanceFactor) + 1
if ifr0 > limit || ifr1 > limit || ifr2 > limit {
t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2)
}
ctx.Registry.GetMetrics(ep.Host).IncInflightRequest()
endpointRegistry.GetMetrics(ep.Host).IncInflightRequest()
}
}

Expand Down
23 changes: 10 additions & 13 deletions proxy/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ import (
"math/rand"
"time"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/routing"
)

func returnFast(rt *routing.Route) bool {
if rt.BackendType != eskip.LBBackend {
return true
}

return rt.LBFadeInDuration <= 0
type fadeIn struct {
rnd *rand.Rand
endpointRegistry *routing.EndpointRegistry
}

func fadeIn(lifetime time.Duration, duration time.Duration, exponent float64) float64 {
func (f *fadeIn) fadeInScore(lifetime time.Duration, duration time.Duration, exponent float64) float64 {
fadingIn := lifetime > 0 && lifetime < duration
if !fadingIn {
return 1
Expand All @@ -26,18 +22,19 @@ func fadeIn(lifetime time.Duration, duration time.Duration, exponent float64) fl
return math.Pow(float64(lifetime)/float64(duration), exponent)
}

func filterFadeIn(endpoints []routing.LBEndpoint, rt *routing.Route, registry *routing.EndpointRegistry, rnd *rand.Rand) []routing.LBEndpoint {
if returnFast(rt) {
func (f *fadeIn) filterFadeIn(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint {
if rt.LBFadeInDuration <= 0 {
return endpoints
}

now := time.Now()
threshold := rnd.Float64()
threshold := f.rnd.Float64()

filtered := make([]routing.LBEndpoint, 0, len(endpoints))
for _, e := range endpoints {
f := fadeIn(
now.Sub(e.Metrics.DetectedTime()),
age := now.Sub(e.Metrics.DetectedTime())
f := f.fadeInScore(
age,
rt.LBFadeInDuration,
rt.LBFadeInExponent,
)
Expand Down
2 changes: 1 addition & 1 deletion proxy/fadein_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

proxy := &Proxy{registry: registry, rnd: rand.New(loadbalancer.NewLockedSource())}
proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}}
return route, proxy, eps
}

Expand Down
8 changes: 3 additions & 5 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ type Proxy struct {
defaultHTTPStatus int
routing *routing.Routing
registry *routing.EndpointRegistry
rnd *rand.Rand
fadein *fadeIn
roundTripper http.RoundTripper
priorityRoutes []PriorityRoute
flags Flags
Expand Down Expand Up @@ -469,14 +469,13 @@ func setRequestURLForDynamicBackend(u *url.URL, stateBag map[string]interface{})
func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint {
rt := ctx.route
endpoints := rt.LBEndpoints
endpoints = filterFadeIn(endpoints, rt, p.registry, p.rnd)
endpoints = p.fadein.filterFadeIn(endpoints, rt)

lbctx := &routing.LBContext{
Request: ctx.request,
Route: rt,
LBEndpoints: endpoints,
Params: ctx.StateBag(),
Registry: p.registry,
}

e := rt.LBAlgorithm.Apply(lbctx)
Expand Down Expand Up @@ -726,6 +725,7 @@ func WithParams(p Params) *Proxy {
return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry},
roundTripper: p.CustomHttpRoundTripperWrap(tr),
priorityRoutes: p.PriorityRoutes,
flags: p.Flags,
Expand All @@ -747,8 +747,6 @@ func WithParams(p Params) *Proxy {
clientTLS: tr.TLSClientConfig,
hostname: hostname,
onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute},
/* #nosec */
rnd: rand.New(loadbalancer.NewLockedSource()),
}
}

Expand Down
1 change: 0 additions & 1 deletion routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ type LBContext struct {
Route *Route
LBEndpoints []LBEndpoint
Params map[string]interface{}
Registry *EndpointRegistry
}

// NewLBContext is used to create a new LBContext, to pass data to the
Expand Down

0 comments on commit e0369d0

Please sign in to comment.