diff --git a/loadbalancer/algorithm_test.go b/loadbalancer/algorithm_test.go index 3bef23bdac..d33ba693ed 100644 --- a/loadbalancer/algorithm_test.go +++ b/loadbalancer/algorithm_test.go @@ -259,6 +259,7 @@ func TestApply(t *testing.T) { t.Run(tt.name, func(t *testing.T) { req, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil) p := NewAlgorithmProvider() + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) r := &routing.Route{ Route: eskip.Route{ BackendType: eskip.LBBackend, @@ -267,14 +268,13 @@ func TestApply(t *testing.T) { }, } rt := p.Do([]*routing.Route{r}) + endpointRegistry.Do([]*routing.Route{r}) lbctx := &routing.LBContext{ Request: req, Route: rt[0], LBEndpoints: rt[0].LBEndpoints, - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), } - lbctx.Registry.Do([]*routing.Route{r}) h := make(map[string]int) for i := 0; i < R; i++ { @@ -304,7 +304,7 @@ func TestConsistentHashSearch(t *testing.T) { endpointRegistry.Do([]*routing.Route{r}) ch := newConsistentHash(endpoints).(*consistentHash) - ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}, Registry: endpointRegistry} + ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}} return endpoints[ch.search(key, ctx)] } @@ -347,9 +347,9 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) { Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25}, - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), } - ctx.Registry.Do([]*routing.Route{route}) + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + endpointRegistry.Do([]*routing.Route{route}) noLoad := ch.Apply(ctx) nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}}) @@ -357,21 +357,21 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) { t.Error("When no endpoints are overloaded, the chosen endpoint should be the same as standard consistentHash") } // now we know that noLoad is the endpoint which should be requested for somekey if load is not an issue. - addInflightRequests(ctx.Registry, noLoad, 20) + addInflightRequests(endpointRegistry, noLoad, 20) failover1 := ch.Apply(ctx) if failover1 == nonBounded { t.Error("When the selected endpoint is overloaded, the chosen endpoint should be different to standard consistentHash") } // now if 2 endpoints are overloaded, the request should go to the final endpoint - addInflightRequests(ctx.Registry, failover1, 20) + addInflightRequests(endpointRegistry, failover1, 20) failover2 := ch.Apply(ctx) if failover2 == nonBounded || failover2 == failover1 { t.Error("Only the final endpoint had load below the average * balanceFactor, so it should have been selected.") } // now all will have same load, should select the original endpoint again - addInflightRequests(ctx.Registry, failover2, 20) + addInflightRequests(endpointRegistry, failover2, 20) allLoaded := ch.Apply(ctx) if allLoaded != nonBounded { t.Error("When all endpoints have the same load, the consistentHash endpoint should be chosen again.") @@ -427,9 +427,9 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) { Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor}, - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), } - ctx.Registry.Do([]*routing.Route{route}) + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + endpointRegistry.Do([]*routing.Route{route}) for i := 0; i < 100; i++ { ep := ch.Apply(ctx) @@ -437,16 +437,16 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) { ifr1 := route.LBEndpoints[1].Metrics.InflightRequests() ifr2 := route.LBEndpoints[2].Metrics.InflightRequests() - assert.Equal(t, int64(ifr0), ctx.Registry.GetMetrics(route.LBEndpoints[0].Host).InflightRequests()) - assert.Equal(t, int64(ifr1), ctx.Registry.GetMetrics(route.LBEndpoints[1].Host).InflightRequests()) - assert.Equal(t, int64(ifr2), ctx.Registry.GetMetrics(route.LBEndpoints[2].Host).InflightRequests()) + assert.Equal(t, int64(ifr0), endpointRegistry.GetMetrics(route.LBEndpoints[0].Host).InflightRequests()) + assert.Equal(t, int64(ifr1), endpointRegistry.GetMetrics(route.LBEndpoints[1].Host).InflightRequests()) + assert.Equal(t, int64(ifr2), endpointRegistry.GetMetrics(route.LBEndpoints[2].Host).InflightRequests()) avg := float64(ifr0+ifr1+ifr2) / 3.0 limit := int64(avg*balanceFactor) + 1 if ifr0 > limit || ifr1 > limit || ifr2 > limit { t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2) } - ctx.Registry.GetMetrics(ep.Host).IncInflightRequest() + endpointRegistry.GetMetrics(ep.Host).IncInflightRequest() } } diff --git a/proxy/fadein.go b/proxy/fadein.go index 52b20b3690..80857b41f7 100644 --- a/proxy/fadein.go +++ b/proxy/fadein.go @@ -5,19 +5,15 @@ import ( "math/rand" "time" - "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/routing" ) -func returnFast(rt *routing.Route) bool { - if rt.BackendType != eskip.LBBackend { - return true - } - - return rt.LBFadeInDuration <= 0 +type fadeIn struct { + rnd *rand.Rand + endpointRegistry *routing.EndpointRegistry } -func fadeIn(lifetime time.Duration, duration time.Duration, exponent float64) float64 { +func (f *fadeIn) fadeInScore(lifetime time.Duration, duration time.Duration, exponent float64) float64 { fadingIn := lifetime > 0 && lifetime < duration if !fadingIn { return 1 @@ -26,18 +22,19 @@ func fadeIn(lifetime time.Duration, duration time.Duration, exponent float64) fl return math.Pow(float64(lifetime)/float64(duration), exponent) } -func filterFadeIn(endpoints []routing.LBEndpoint, rt *routing.Route, registry *routing.EndpointRegistry, rnd *rand.Rand) []routing.LBEndpoint { - if returnFast(rt) { +func (f *fadeIn) filterFadeIn(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint { + if rt.LBFadeInDuration <= 0 { return endpoints } now := time.Now() - threshold := rnd.Float64() + threshold := f.rnd.Float64() filtered := make([]routing.LBEndpoint, 0, len(endpoints)) for _, e := range endpoints { - f := fadeIn( - now.Sub(e.Metrics.DetectedTime()), + age := now.Sub(e.Metrics.DetectedTime()) + f := f.fadeInScore( + age, rt.LBFadeInDuration, rt.LBFadeInExponent, ) diff --git a/proxy/fadein_internal_test.go b/proxy/fadein_internal_test.go index a18e79e08d..8b073b715a 100644 --- a/proxy/fadein_internal_test.go +++ b/proxy/fadein_internal_test.go @@ -92,7 +92,7 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) } - proxy := &Proxy{registry: registry, rnd: rand.New(loadbalancer.NewLockedSource())} + proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}} return route, proxy, eps } diff --git a/proxy/proxy.go b/proxy/proxy.go index f90af2ad43..9109364aa0 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -326,7 +326,7 @@ type Proxy struct { defaultHTTPStatus int routing *routing.Routing registry *routing.EndpointRegistry - rnd *rand.Rand + fadein *fadeIn roundTripper http.RoundTripper priorityRoutes []PriorityRoute flags Flags @@ -469,14 +469,13 @@ func setRequestURLForDynamicBackend(u *url.URL, stateBag map[string]interface{}) func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint { rt := ctx.route endpoints := rt.LBEndpoints - endpoints = filterFadeIn(endpoints, rt, p.registry, p.rnd) + endpoints = p.fadein.filterFadeIn(endpoints, rt) lbctx := &routing.LBContext{ Request: ctx.request, Route: rt, LBEndpoints: endpoints, Params: ctx.StateBag(), - Registry: p.registry, } e := rt.LBAlgorithm.Apply(lbctx) @@ -726,6 +725,7 @@ func WithParams(p Params) *Proxy { return &Proxy{ routing: p.Routing, registry: p.EndpointRegistry, + fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry}, roundTripper: p.CustomHttpRoundTripperWrap(tr), priorityRoutes: p.PriorityRoutes, flags: p.Flags, @@ -747,8 +747,6 @@ func WithParams(p Params) *Proxy { clientTLS: tr.TLSClientConfig, hostname: hostname, onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute}, - /* #nosec */ - rnd: rand.New(loadbalancer.NewLockedSource()), } } diff --git a/routing/routing.go b/routing/routing.go index 9cb39e23e0..efdb746305 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -170,7 +170,6 @@ type LBContext struct { Route *Route LBEndpoints []LBEndpoint Params map[string]interface{} - Registry *EndpointRegistry } // NewLBContext is used to create a new LBContext, to pass data to the