Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor: refactor fadeIn code #2886

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func TestApply(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
req, _ := http.NewRequest("GET", "http://127.0.0.1:1234/foo", nil)
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand All @@ -267,14 +268,13 @@ func TestApply(t *testing.T) {
},
}
rt := p.Do([]*routing.Route{r})
endpointRegistry.Do([]*routing.Route{r})

lbctx := &routing.LBContext{
Request: req,
Route: rt[0],
LBEndpoints: rt[0].LBEndpoints,
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
lbctx.Registry.Do([]*routing.Route{r})

h := make(map[string]int)
for i := 0; i < R; i++ {
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestConsistentHashSearch(t *testing.T) {
endpointRegistry.Do([]*routing.Route{r})

ch := newConsistentHash(endpoints).(*consistentHash)
ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}, Registry: endpointRegistry}
ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}}
return endpoints[ch.search(key, ctx)]
}

Expand Down Expand Up @@ -347,31 +347,31 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
Route: route,
LBEndpoints: route.LBEndpoints,
Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
ctx.Registry.Do([]*routing.Route{route})
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
endpointRegistry.Do([]*routing.Route{route})
noLoad := ch.Apply(ctx)
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})

if noLoad != nonBounded {
t.Error("When no endpoints are overloaded, the chosen endpoint should be the same as standard consistentHash")
}
// now we know that noLoad is the endpoint which should be requested for somekey if load is not an issue.
addInflightRequests(ctx.Registry, noLoad, 20)
addInflightRequests(endpointRegistry, noLoad, 20)
failover1 := ch.Apply(ctx)
if failover1 == nonBounded {
t.Error("When the selected endpoint is overloaded, the chosen endpoint should be different to standard consistentHash")
}

// now if 2 endpoints are overloaded, the request should go to the final endpoint
addInflightRequests(ctx.Registry, failover1, 20)
addInflightRequests(endpointRegistry, failover1, 20)
failover2 := ch.Apply(ctx)
if failover2 == nonBounded || failover2 == failover1 {
t.Error("Only the final endpoint had load below the average * balanceFactor, so it should have been selected.")
}

// now all will have same load, should select the original endpoint again
addInflightRequests(ctx.Registry, failover2, 20)
addInflightRequests(endpointRegistry, failover2, 20)
allLoaded := ch.Apply(ctx)
if allLoaded != nonBounded {
t.Error("When all endpoints have the same load, the consistentHash endpoint should be chosen again.")
Expand Down Expand Up @@ -427,26 +427,26 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
Route: route,
LBEndpoints: route.LBEndpoints,
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
ctx.Registry.Do([]*routing.Route{route})
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
endpointRegistry.Do([]*routing.Route{route})

for i := 0; i < 100; i++ {
ep := ch.Apply(ctx)
ifr0 := route.LBEndpoints[0].Metrics.InflightRequests()
ifr1 := route.LBEndpoints[1].Metrics.InflightRequests()
ifr2 := route.LBEndpoints[2].Metrics.InflightRequests()

assert.Equal(t, int64(ifr0), ctx.Registry.GetMetrics(route.LBEndpoints[0].Host).InflightRequests())
assert.Equal(t, int64(ifr1), ctx.Registry.GetMetrics(route.LBEndpoints[1].Host).InflightRequests())
assert.Equal(t, int64(ifr2), ctx.Registry.GetMetrics(route.LBEndpoints[2].Host).InflightRequests())
assert.Equal(t, int64(ifr0), endpointRegistry.GetMetrics(route.LBEndpoints[0].Host).InflightRequests())
assert.Equal(t, int64(ifr1), endpointRegistry.GetMetrics(route.LBEndpoints[1].Host).InflightRequests())
assert.Equal(t, int64(ifr2), endpointRegistry.GetMetrics(route.LBEndpoints[2].Host).InflightRequests())

avg := float64(ifr0+ifr1+ifr2) / 3.0
limit := int64(avg*balanceFactor) + 1
if ifr0 > limit || ifr1 > limit || ifr2 > limit {
t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2)
}
ctx.Registry.GetMetrics(ep.Host).IncInflightRequest()
endpointRegistry.GetMetrics(ep.Host).IncInflightRequest()
}
}

Expand Down
23 changes: 10 additions & 13 deletions proxy/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ import (
"math/rand"
"time"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/routing"
)

func returnFast(rt *routing.Route) bool {
if rt.BackendType != eskip.LBBackend {
RomanZavodskikh marked this conversation as resolved.
Show resolved Hide resolved
return true
}

return rt.LBFadeInDuration <= 0
type fadeIn struct {
rnd *rand.Rand
endpointRegistry *routing.EndpointRegistry
}

func fadeIn(lifetime time.Duration, duration time.Duration, exponent float64) float64 {
func (f *fadeIn) fadeInScore(lifetime time.Duration, duration time.Duration, exponent float64) float64 {
fadingIn := lifetime > 0 && lifetime < duration
if !fadingIn {
return 1
Expand All @@ -26,18 +22,19 @@ func fadeIn(lifetime time.Duration, duration time.Duration, exponent float64) fl
return math.Pow(float64(lifetime)/float64(duration), exponent)
}

func filterFadeIn(endpoints []routing.LBEndpoint, rt *routing.Route, registry *routing.EndpointRegistry, rnd *rand.Rand) []routing.LBEndpoint {
if returnFast(rt) {
func (f *fadeIn) filterFadeIn(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint {
if rt.LBFadeInDuration <= 0 {
return endpoints
}

now := time.Now()
threshold := rnd.Float64()
threshold := f.rnd.Float64()

filtered := make([]routing.LBEndpoint, 0, len(endpoints))
for _, e := range endpoints {
f := fadeIn(
now.Sub(e.Metrics.DetectedTime()),
age := now.Sub(e.Metrics.DetectedTime())
f := f.fadeInScore(
age,
rt.LBFadeInDuration,
rt.LBFadeInExponent,
)
Expand Down
2 changes: 1 addition & 1 deletion proxy/fadein_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

proxy := &Proxy{registry: registry, rnd: rand.New(loadbalancer.NewLockedSource())}
proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}}
return route, proxy, eps
}

Expand Down
8 changes: 3 additions & 5 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ type Proxy struct {
defaultHTTPStatus int
routing *routing.Routing
registry *routing.EndpointRegistry
rnd *rand.Rand
fadein *fadeIn
roundTripper http.RoundTripper
priorityRoutes []PriorityRoute
flags Flags
Expand Down Expand Up @@ -469,14 +469,13 @@ func setRequestURLForDynamicBackend(u *url.URL, stateBag map[string]interface{})
func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint {
rt := ctx.route
endpoints := rt.LBEndpoints
endpoints = filterFadeIn(endpoints, rt, p.registry, p.rnd)
endpoints = p.fadein.filterFadeIn(endpoints, rt)

lbctx := &routing.LBContext{
Request: ctx.request,
Route: rt,
LBEndpoints: endpoints,
Params: ctx.StateBag(),
Registry: p.registry,
}

e := rt.LBAlgorithm.Apply(lbctx)
Expand Down Expand Up @@ -726,6 +725,7 @@ func WithParams(p Params) *Proxy {
return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry},
roundTripper: p.CustomHttpRoundTripperWrap(tr),
priorityRoutes: p.PriorityRoutes,
flags: p.Flags,
Expand All @@ -747,8 +747,6 @@ func WithParams(p Params) *Proxy {
clientTLS: tr.TLSClientConfig,
hostname: hostname,
onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute},
/* #nosec */
rnd: rand.New(loadbalancer.NewLockedSource()),
}
}

Expand Down
1 change: 0 additions & 1 deletion routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ type LBContext struct {
Route *Route
LBEndpoints []LBEndpoint
Params map[string]interface{}
Registry *EndpointRegistry
}

// NewLBContext is used to create a new LBContext, to pass data to the
Expand Down
Loading