From 11127837e8644dec727cf7b0137141302587cac5 Mon Sep 17 00:00:00 2001 From: Roman Zavodskikh Date: Tue, 26 Sep 2023 17:59:48 +0200 Subject: [PATCH] Decouple fadeIn from loadbalancer Signed-off-by: Roman Zavodskikh --- loadbalancer/algorithm.go | 176 ++--------- loadbalancer/algorithm_test.go | 15 +- loadbalancer/fadein_test.go | 449 ----------------------------- loadbalancer/locked_source.go | 2 +- loadbalancer/locked_source_test.go | 2 +- proxy/fadein.go | 53 ++++ proxy/fadein_internal_test.go | 414 ++++++++++++++++++++++++++ proxy/proxy.go | 25 +- skptesting/run_fadein_test.sh | 4 +- 9 files changed, 531 insertions(+), 609 deletions(-) delete mode 100644 loadbalancer/fadein_test.go create mode 100644 proxy/fadein.go create mode 100644 proxy/fadein_internal_test.go diff --git a/loadbalancer/algorithm.go b/loadbalancer/algorithm.go index ce0c79f8fb..c98ceee40e 100644 --- a/loadbalancer/algorithm.go +++ b/loadbalancer/algorithm.go @@ -3,12 +3,10 @@ package loadbalancer import ( "errors" "fmt" - "math" "math/rand" "sort" "sync" "sync/atomic" - "time" "github.com/cespare/xxhash/v2" log "github.com/sirupsen/logrus" @@ -53,106 +51,14 @@ var ( defaultAlgorithm = newRoundRobin ) -func fadeInState(now time.Time, duration time.Duration, detected time.Time) (time.Duration, bool) { - rel := now.Sub(detected) - return rel, rel > 0 && rel < duration -} - -func fadeIn(now time.Time, duration time.Duration, exponent float64, detected time.Time) float64 { - rel, fadingIn := fadeInState(now, duration, detected) - if !fadingIn { - return 1 - } - - return math.Pow(float64(rel)/float64(duration), exponent) -} - -func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, now time.Time) routing.LBEndpoint { - var sum float64 - rt := ctx.Route - ep := ctx.LBEndpoints - for _, epi := range ep { - wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Metrics.DetectedTime()) - sum += wi - } - - choice := ep[len(ep)-1] - r := rnd.Float64() * sum - var upto float64 - for i, epi := range ep { - upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Metrics.DetectedTime()) - if upto > r { - choice = ep[i] - break - } - } - - return choice -} - -func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, now time.Time) routing.LBEndpoint { - notFadingIndexes := wi - ep := ctx.LBEndpoints - - // if all endpoints are fading, the simplest approach is to use the oldest, - // this departs from the desired curve, but guarantees monotonic fade-in. From - // the perspective of the oldest endpoint, this is temporarily the same as if - // there was no fade-in. - if len(notFadingIndexes) == 0 { - return shiftWeighted(rnd, ctx, now) - } - - // otherwise equally distribute between the old endpoints - return ep[notFadingIndexes[rnd.Intn(len(notFadingIndexes))]] -} - -func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, choice int, algo routing.LBAlgorithm) routing.LBEndpoint { - ep := ctx.LBEndpoints - now := time.Now() - f := fadeIn( - now, - ctx.Route.LBFadeInDuration, - ctx.Route.LBFadeInExponent, - ctx.LBEndpoints[choice].Metrics.DetectedTime(), - ) - - if rnd.Float64() < f { - return ep[choice] - } - notFadingIndexes := make([]int, 0, len(ep)) - for i := 0; i < len(ep); i++ { - if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Metrics.DetectedTime()); !fadingIn { - notFadingIndexes = append(notFadingIndexes, i) - } - } - - switch a := algo.(type) { - case *roundRobin: - return shiftToRemaining(a.rnd, ctx, notFadingIndexes, now) - case *random: - return shiftToRemaining(a.rnd, ctx, notFadingIndexes, now) - case *consistentHash: - // If all endpoints are fading, normal consistent hash result - if len(notFadingIndexes) == 0 { - return ep[choice] - } - // otherwise calculate consistent hash again using endpoints which are not fading - return ep[a.chooseConsistentHashEndpoint(ctx, skipFadingEndpoints(notFadingIndexes))] - default: - return ep[choice] - } -} - type roundRobin struct { index int64 - rnd *rand.Rand } func newRoundRobin(endpoints []string) routing.LBAlgorithm { - rnd := rand.New(newLockedSource()) // #nosec + rnd := rand.New(NewLockedSource()) // #nosec return &roundRobin{ index: int64(rnd.Intn(len(endpoints))), - rnd: rnd, } } @@ -162,13 +68,8 @@ func (r *roundRobin) Apply(ctx *routing.LBContext) routing.LBEndpoint { return ctx.LBEndpoints[0] } - index := int(atomic.AddInt64(&r.index, 1) % int64(len(ctx.LBEndpoints))) - - if ctx.Route.LBFadeInDuration <= 0 { - return ctx.LBEndpoints[index] - } - - return withFadeIn(r.rnd, ctx, index, r) + choice := int(atomic.AddInt64(&r.index, 1) % int64(len(ctx.LBEndpoints))) + return ctx.LBEndpoints[choice] } type random struct { @@ -178,7 +79,7 @@ type random struct { func newRandom(endpoints []string) routing.LBAlgorithm { // #nosec return &random{ - rnd: rand.New(newLockedSource()), + rnd: rand.New(NewLockedSource()), } } @@ -188,12 +89,8 @@ func (r *random) Apply(ctx *routing.LBContext) routing.LBEndpoint { return ctx.LBEndpoints[0] } - i := r.rnd.Intn(len(ctx.LBEndpoints)) - if ctx.Route.LBFadeInDuration <= 0 { - return ctx.LBEndpoints[i] - } - - return withFadeIn(r.rnd, ctx, i, r) + choice := r.rnd.Intn(len(ctx.LBEndpoints)) + return ctx.LBEndpoints[choice] } type ( @@ -203,7 +100,6 @@ type ( } consistentHash struct { hashRing []endpointHash // list of endpoints sorted by hash value - rnd *rand.Rand } ) @@ -214,10 +110,8 @@ func (ch *consistentHash) Swap(i, j int) { } func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routing.LBAlgorithm { - rnd := rand.New(newLockedSource()) // #nosec ch := &consistentHash{ hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)), - rnd: rnd, } for i, ep := range endpoints { endpointStartIndex := hashesPerEndpoint * i @@ -237,22 +131,32 @@ func hash(s string) uint64 { return xxhash.Sum64String(s) } +func skipEndpoint(c *routing.LBContext, index int) bool { + host := c.Route.LBEndpoints[index].Host + for i := range c.LBEndpoints { + if c.LBEndpoints[i].Host == host { + return false + } + } + return true +} + // Returns index in hash ring with the closest hash to key's hash -func (ch *consistentHash) searchRing(key string, skipEndpoint func(int) bool) int { +func (ch *consistentHash) searchRing(key string, ctx *routing.LBContext) int { h := hash(key) i := sort.Search(ch.Len(), func(i int) bool { return ch.hashRing[i].hash >= h }) if i == ch.Len() { // rollover i = 0 } - for skipEndpoint(ch.hashRing[i].index) { + for skipEndpoint(ctx, ch.hashRing[i].index) { i = (i + 1) % ch.Len() } return i } // Returns index of endpoint with closest hash to key's hash -func (ch *consistentHash) search(key string, skipEndpoint func(int) bool) int { - ringIndex := ch.searchRing(key, skipEndpoint) +func (ch *consistentHash) search(key string, ctx *routing.LBContext) int { + ringIndex := ch.searchRing(key, ctx) return ch.hashRing[ringIndex].index } @@ -266,15 +170,15 @@ func computeLoadAverage(ctx *routing.LBContext) float64 { } // Returns index of endpoint with closest hash to key's hash, which is also below the target load -// skipEndpoint function is used to skip endpoints we don't want, such as fading endpoints -func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, ctx *routing.LBContext, skipEndpoint func(int) bool) int { - ringIndex := ch.searchRing(key, skipEndpoint) +// skipEndpoint function is used to skip endpoints we don't want, for example, fading endpoints +func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, ctx *routing.LBContext) int { + ringIndex := ch.searchRing(key, ctx) averageLoad := computeLoadAverage(ctx) targetLoad := averageLoad * balanceFactor // Loop round ring, starting at endpoint with closest hash. Stop when we find one whose load is less than targetLoad. for i := 0; i < ch.Len(); i++ { endpointIndex := ch.hashRing[ringIndex].index - if skipEndpoint(endpointIndex) { + if skipEndpoint(ctx, endpointIndex) { continue } load := ctx.LBEndpoints[endpointIndex].Metrics.InflightRequests() @@ -295,16 +199,11 @@ func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint { return ctx.LBEndpoints[0] } - choice := ch.chooseConsistentHashEndpoint(ctx, noSkippedEndpoints) - - if ctx.Route.LBFadeInDuration <= 0 { - return ctx.LBEndpoints[choice] - } - - return withFadeIn(ch.rnd, ctx, choice, ch) + choice := ch.chooseConsistentHashEndpoint(ctx) + return ctx.LBEndpoints[choice] } -func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, skipEndpoint func(int) bool) int { +func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext) int { key, ok := ctx.Params[ConsistentHashKey].(string) if !ok { key = snet.RemoteHost(ctx.Request).String() @@ -312,29 +211,14 @@ func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, s balanceFactor, ok := ctx.Params[ConsistentHashBalanceFactor].(float64) var choice int if !ok { - choice = ch.search(key, skipEndpoint) + choice = ch.search(key, ctx) } else { - choice = ch.boundedLoadSearch(key, balanceFactor, ctx, skipEndpoint) + choice = ch.boundedLoadSearch(key, balanceFactor, ctx) } return choice } -func skipFadingEndpoints(notFadingEndpoints []int) func(int) bool { - return func(i int) bool { - for _, notFadingEndpoint := range notFadingEndpoints { - if i == notFadingEndpoint { - return false - } - } - return true - } -} - -func noSkippedEndpoints(_ int) bool { - return false -} - type powerOfRandomNChoices struct { mu sync.Mutex rnd *rand.Rand @@ -343,7 +227,7 @@ type powerOfRandomNChoices struct { // newPowerOfRandomNChoices selects N random backends and picks the one with less outstanding requests. func newPowerOfRandomNChoices([]string) routing.LBAlgorithm { - rnd := rand.New(newLockedSource()) // #nosec + rnd := rand.New(NewLockedSource()) // #nosec return &powerOfRandomNChoices{ rnd: rnd, numberOfChoices: powerOfRandomNChoicesDefaultN, diff --git a/loadbalancer/algorithm_test.go b/loadbalancer/algorithm_test.go index e90a3e1c16..3bef23bdac 100644 --- a/loadbalancer/algorithm_test.go +++ b/loadbalancer/algorithm_test.go @@ -291,8 +291,21 @@ func TestApply(t *testing.T) { func TestConsistentHashSearch(t *testing.T) { apply := func(key string, endpoints []string) string { + p := NewAlgorithmProvider() + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + r := &routing.Route{ + Route: eskip.Route{ + BackendType: eskip.LBBackend, + LBAlgorithm: ConsistentHash.String(), + LBEndpoints: endpoints, + }, + } + p.Do([]*routing.Route{r}) + endpointRegistry.Do([]*routing.Route{r}) + ch := newConsistentHash(endpoints).(*consistentHash) - return endpoints[ch.search(key, noSkippedEndpoints)] + ctx := &routing.LBContext{Route: r, LBEndpoints: r.LBEndpoints, Params: map[string]interface{}{ConsistentHashKey: key}, Registry: endpointRegistry} + return endpoints[ch.search(key, ctx)] } endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"} diff --git a/loadbalancer/fadein_test.go b/loadbalancer/fadein_test.go deleted file mode 100644 index a2cb2720ef..0000000000 --- a/loadbalancer/fadein_test.go +++ /dev/null @@ -1,449 +0,0 @@ -package loadbalancer - -import ( - "fmt" - "math/rand" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/zalando/skipper/routing" -) - -const ( - fadeInRequestCount = 300_000 - bucketCount = 20 - monotonyTolerance = 0.4 // we need to use a high tolerance for CI testing - - defaultFadeInDuration = 500 * time.Millisecond - defaultFadeInDurationHuge = 24 * time.Hour // we need this to be sure we're at the very beginning of fading in -) - -func absint(i int) int { - if i < 0 { - return -i - } - - return i -} - -func tolerance(prev, next int) int { - return int(float64(prev+next) * monotonyTolerance / 2) -} - -func checkMonotony(direction, prev, next int) bool { - t := tolerance(prev, next) - switch direction { - case 1: - return next-prev >= -t - case -1: - return next-prev <= t - default: - return absint(next-prev) < t - } -} - -func multiply(k float64, d time.Duration) time.Duration { - return time.Duration(k * float64(d)) -} - -func initializeEndpoints(endpointAges []float64, fadeInDuration time.Duration) (*routing.LBContext, []string) { - var detectionTimes []time.Time - now := time.Now() - for _, ea := range endpointAges { - endpointAgeDuration := multiply(ea, fadeInDuration) - detectionTimes = append(detectionTimes, now.Add(-endpointAgeDuration)) - } - - var eps []string - for i, ea := range endpointAges { - endpointAgeDuration := multiply(ea, fadeInDuration) - eps = append(eps, fmt.Sprintf("ep-%d-%s.test", i, endpointAgeDuration)) - } - - ctx := &routing.LBContext{ - Params: map[string]interface{}{}, - Route: &routing.Route{ - LBFadeInDuration: fadeInDuration, - LBFadeInExponent: 1, - }, - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), - } - - for i := range eps { - ctx.Route.LBEndpoints = append(ctx.Route.LBEndpoints, routing.LBEndpoint{ - Host: eps[i], - Metrics: ctx.Registry.GetMetrics(eps[i]), - }) - ctx.Registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) - } - ctx.LBEndpoints = ctx.Route.LBEndpoints - - return ctx, eps -} - -// This function is needed to calculate the duration needed to send fadeInRequestCount requests. -// It is needed to send number of requests close to fadeInRequestCount on one hand and to have them sent exactly -// in the fade-in duration returned on the other hand. -func calculateFadeInDuration(t *testing.T, algorithm func([]string) routing.LBAlgorithm, endpointAges []float64) time.Duration { - const precalculateRatio = 10 - - ctx, eps := initializeEndpoints(endpointAges, defaultFadeInDuration) - a := algorithm(eps) - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - - t.Log("preemulation start", time.Now()) - // Preemulate the load balancer loop to find out the approximate amount of RPS - begin := time.Now() - for i := 0; i < fadeInRequestCount/precalculateRatio; i++ { - ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) - _ = a.Apply(ctx) - } - preemulationDuration := time.Since(begin) - - return preemulationDuration * precalculateRatio -} - -func testFadeIn( - t *testing.T, - name string, - algorithm func([]string) routing.LBAlgorithm, - endpointAges ...float64, -) { - t.Run(name, func(t *testing.T) { - fadeInDuration := calculateFadeInDuration(t, algorithm, endpointAges) - ctx, eps := initializeEndpoints(endpointAges, fadeInDuration) - a := algorithm(eps) - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - - t.Log("test start", time.Now()) - var stats []string - stop := time.After(fadeInDuration) - // Emulate the load balancer loop, sending requests to it with random hash keys - // over and over again till fadeIn period is over. - func() { - for { - ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) - ep := a.Apply(ctx) - stats = append(stats, ep.Host) - select { - case <-stop: - return - default: - } - } - }() - - // Split fade-in period into buckets and count how many times each endpoint was selected. - t.Log("test done", time.Now()) - t.Log("CSV timestamp," + strings.Join(eps, ",")) - bucketSize := len(stats) / bucketCount - var allBuckets []map[string]int - for i := 0; i < bucketCount; i++ { - bucketStats := make(map[string]int) - for j := i * bucketSize; j < (i+1)*bucketSize; j++ { - bucketStats[stats[j]]++ - } - - allBuckets = append(allBuckets, bucketStats) - } - - directions := make(map[string]int) - for _, epi := range eps { - first := allBuckets[0][epi] - last := allBuckets[len(allBuckets)-1][epi] - t := tolerance(first, last) - switch { - case last-first > t: - directions[epi] = 1 - case last-first < t: - directions[epi] = -1 - } - } - - for i := range allBuckets { - // trim first and last (warmup and settling) - if i < 2 || i == len(allBuckets)-1 { - continue - } - - for _, epi := range eps { - if !checkMonotony( - directions[epi], - allBuckets[i-1][epi], - allBuckets[i][epi], - ) { - t.Error("non-monotonic change", epi, i) - } - } - } - - for i, bucketStats := range allBuckets { - var showStats []string - for _, epi := range eps { - showStats = append(showStats, fmt.Sprintf("%d", bucketStats[epi])) - } - - // Print CSV-like output for, where row number represents time and - // column represents endpoint. You can visualize it using - // ./skptesting/run_fadein_test.sh from the skipper repo root. - t.Log("CSV " + fmt.Sprintf("%d,", i) + strings.Join(showStats, ",")) - } - }) -} - -func newConsistentHashForTest(endpoints []string) routing.LBAlgorithm { - // The default parameter 100 is too small to get even distribution - return newConsistentHashInternal(endpoints, 1000) -} - -// Those tests check that the amount of requests per period for each endpoint is monotonical over the time. -// For every endpoint, it could increase, decrease or stay the same. -func TestFadeIn(t *testing.T) { - old := 2.0 - testFadeIn(t, "power-of-n-random-choices, 0", newPowerOfRandomNChoices, old, old) - testFadeIn(t, "power-of-n-random-choices, 1", newPowerOfRandomNChoices, 0, old) - testFadeIn(t, "power-of-n-random-choices, 2", newPowerOfRandomNChoices, 0, 0) - testFadeIn(t, "power-of-n-random-choices, 3", newPowerOfRandomNChoices, old, 0) - testFadeIn(t, "power-of-n-random-choices, 4", newPowerOfRandomNChoices, old, old, old, 0) - testFadeIn(t, "power-of-n-random-choices, 5", newPowerOfRandomNChoices, old, old, old, 0, 0, 0) - testFadeIn(t, "power-of-n-random-choices, 6", newPowerOfRandomNChoices, old, 0, 0, 0) - testFadeIn(t, "power-of-n-random-choices, 7", newPowerOfRandomNChoices, old, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "power-of-n-random-choices, 8", newPowerOfRandomNChoices, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "power-of-n-random-choices, 9", newPowerOfRandomNChoices, 1.0/2.0, 1.0/3.0, 1.0/4.0) - - testFadeIn(t, "round-robin, 0", newRoundRobin, old, old) - testFadeIn(t, "round-robin, 1", newRoundRobin, 0, old) - testFadeIn(t, "round-robin, 2", newRoundRobin, 0, 0) - testFadeIn(t, "round-robin, 3", newRoundRobin, old, 0) - testFadeIn(t, "round-robin, 4", newRoundRobin, old, old, old, 0) - testFadeIn(t, "round-robin, 5", newRoundRobin, old, old, old, 0, 0, 0) - testFadeIn(t, "round-robin, 6", newRoundRobin, old, 0, 0, 0) - testFadeIn(t, "round-robin, 7", newRoundRobin, old, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "round-robin, 8", newRoundRobin, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "round-robin, 9", newRoundRobin, 1.0/2.0, 1.0/3.0, 1.0/4.0) - - testFadeIn(t, "random, 0", newRandom, old, old) - testFadeIn(t, "random, 1", newRandom, 0, old) - testFadeIn(t, "random, 2", newRandom, 0, 0) - testFadeIn(t, "random, 3", newRandom, old, 0) - testFadeIn(t, "random, 4", newRandom, old, old, old, 0) - testFadeIn(t, "random, 5", newRandom, old, old, old, 0, 0, 0) - testFadeIn(t, "random, 6", newRandom, old, 0, 0, 0) - testFadeIn(t, "random, 7", newRandom, old, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "random, 8", newRandom, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "random, 9", newRandom, 1.0/2.0, 1.0/3.0, 1.0/4.0) - - testFadeIn(t, "consistent-hash, 0", newConsistentHashForTest, old, old) - testFadeIn(t, "consistent-hash, 1", newConsistentHashForTest, 0, old) - testFadeIn(t, "consistent-hash, 2", newConsistentHashForTest, 0, 0) - testFadeIn(t, "consistent-hash, 3", newConsistentHashForTest, old, 0) - testFadeIn(t, "consistent-hash, 4", newConsistentHashForTest, old, old, old, 0) - testFadeIn(t, "consistent-hash, 5", newConsistentHashForTest, old, old, old, 0, 0, 0) - testFadeIn(t, "consistent-hash, 6", newConsistentHashForTest, old, 0, 0, 0) - testFadeIn(t, "consistent-hash, 7", newConsistentHashForTest, old, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "consistent-hash, 8", newConsistentHashForTest, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "consistent-hash, 9", newConsistentHashForTest, 1.0/2.0, 1.0/3.0, 1.0/4.0) -} - -func testFadeInLoadBetweenOldAndNewEps( - t *testing.T, - name string, - algorithm func([]string) routing.LBAlgorithm, - nOld int, nNew int, -) { - t.Run(name, func(t *testing.T) { - const ( - numberOfReqs = 100000 - acceptableErrorNearZero = 10 - old = 1.0 - new = 0.0 - ) - endpointAges := []float64{} - for i := 0; i < nOld; i++ { - endpointAges = append(endpointAges, 1.0) - } - for i := 0; i < nNew; i++ { - endpointAges = append(endpointAges, 0.0) - } - - ctx, eps := initializeEndpoints(endpointAges, defaultFadeInDurationHuge) - - a := algorithm(eps) - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - nReqs := map[string]int{} - - t.Log("test start", time.Now()) - // Emulate the load balancer loop, sending requests to it with random hash keys - // over and over again till fadeIn period is over. - for i := 0; i < numberOfReqs; i++ { - ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) - ep := a.Apply(ctx) - nReqs[ep.Host]++ - } - - if nOld == 0 { - expectedReqsPerEndpoint := numberOfReqs / nNew - for _, ep := range eps { - assert.InEpsilon(t, expectedReqsPerEndpoint, nReqs[ep], 0.2) - } - } else { - expectedReqsPerOldEndpoint := numberOfReqs / nOld - for idx, ep := range eps { - if endpointAges[idx] == old { - assert.InEpsilon(t, expectedReqsPerOldEndpoint, nReqs[ep], 0.2) - } - if endpointAges[idx] == new { - assert.InDelta(t, 0, nReqs[ep], acceptableErrorNearZero) - } - } - } - }) -} - -// Those tests check that the amount of requests per period for every endpoint at the very beginning of fading in (when all endpoints are new) -// and at the very end of fading in (when all endpoints are old) is correct. -func TestFadeInLoadBetweenOldAndNewEps(t *testing.T) { - for nOld := 0; nOld < 6; nOld++ { - for nNew := 0; nNew < 6; nNew++ { - if nOld == 0 && nNew == 0 { - continue - } - - // This test does not work with power of n random choices at the moment, because there is no fadein for - // this algorithm. Feel free to uncomment this line when this problem is fixed. - // testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("power-of-n-random-choices, %d old, %d new", nOld, nNew), newPowerOfRandomNChoices, nOld, nNew) - - testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("consistent-hash, %d old, %d new", nOld, nNew), newConsistentHash, nOld, nNew) - testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("random, %d old, %d new", nOld, nNew), newRandom, nOld, nNew) - testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("round-robin, %d old, %d new", nOld, nNew), newRoundRobin, nOld, nNew) - } - } -} - -func testApplyEndsWhenAllEndpointsAreFading( - t *testing.T, - name string, - algorithm func([]string) routing.LBAlgorithm, - nEndpoints int, -) { - t.Run(name, func(t *testing.T) { - // Initialize every endpoint with zero: every endpoint is new - endpointAges := make([]float64, nEndpoints) - - ctx, eps := initializeEndpoints(endpointAges, defaultFadeInDurationHuge) - - a := algorithm(eps) - ctx.Params[ConsistentHashKey] = "someConstantString" - applied := make(chan struct{}) - - go func() { - a.Apply(ctx) - close(applied) - }() - - select { - case <-time.After(time.Second): - t.Errorf("a.Apply has not finished in a reasonable time") - case <-applied: - break - } - }) -} - -func TestApplyEndsWhenAllEndpointsAreFading(t *testing.T) { - for nEndpoints := 1; nEndpoints < 10; nEndpoints++ { - testApplyEndsWhenAllEndpointsAreFading(t, "consistent-hash", newConsistentHash, nEndpoints) - testApplyEndsWhenAllEndpointsAreFading(t, "random", newRandom, nEndpoints) - testApplyEndsWhenAllEndpointsAreFading(t, "round-robin", newRoundRobin, nEndpoints) - } -} - -func benchmarkFadeIn( - b *testing.B, - name string, - algorithm func([]string) routing.LBAlgorithm, - clients int, - endpointAges ...float64, -) { - b.Run(name, func(b *testing.B) { - var detectionTimes []time.Time - now := time.Now() - for _, ea := range endpointAges { - detectionTimes = append(detectionTimes, now.Add(-defaultFadeInDuration*time.Duration(ea))) - } - - var eps []string - for i := range endpointAges { - eps = append(eps, string('a'+rune(i))) - } - - a := algorithm(eps) - - route := &routing.Route{ - LBFadeInDuration: defaultFadeInDuration, - LBFadeInExponent: 1, - } - registry := routing.NewEndpointRegistry(routing.RegistryOptions{}) - for i := range eps { - route.LBEndpoints = append(route.LBEndpoints, routing.LBEndpoint{ - Host: eps[i], - Metrics: registry.GetMetrics(eps[i]), - }) - registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) - } - - var wg sync.WaitGroup - - // Emulate the load balancer loop, sending requests to it with random hash keys - // over and over again till fadeIn period is over. - b.ResetTimer() - for i := 0; i < clients; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - ctx := &routing.LBContext{ - Params: map[string]interface{}{}, - Route: route, - Registry: registry, - } - - for j := 0; j < b.N/clients; j++ { - ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) - _ = a.Apply(ctx) - } - }(i) - } - - wg.Wait() - }) -} - -func repeatedSlice(v float64, n int) []float64 { - var s []float64 - for i := 0; i < n; i++ { - s = append(s, v) - } - return s -} - -func BenchmarkFadeIn(b *testing.B) { - old := 2.0 - clients := []int{1, 4, 16, 64, 256} - for _, c := range clients { - benchmarkFadeIn(b, fmt.Sprintf("random, 11, %d clients", c), newRandom, c, repeatedSlice(old, 200)...) - } - - for _, c := range clients { - benchmarkFadeIn(b, fmt.Sprintf("round-robin, 11, %d clients", c), newRoundRobin, c, repeatedSlice(old, 200)...) - } - - for _, c := range clients { - benchmarkFadeIn(b, fmt.Sprintf("consistent-hash, 11, %d clients", c), newConsistentHash, c, repeatedSlice(old, 200)...) - } -} diff --git a/loadbalancer/locked_source.go b/loadbalancer/locked_source.go index a662a35c4d..7a54bd980c 100644 --- a/loadbalancer/locked_source.go +++ b/loadbalancer/locked_source.go @@ -11,7 +11,7 @@ type lockedSource struct { r rand.Source } -func newLockedSource() *lockedSource { +func NewLockedSource() *lockedSource { return &lockedSource{r: rand.NewSource(time.Now().UnixNano())} } diff --git a/loadbalancer/locked_source_test.go b/loadbalancer/locked_source_test.go index c3cfa68df9..fb512b5b98 100644 --- a/loadbalancer/locked_source_test.go +++ b/loadbalancer/locked_source_test.go @@ -12,7 +12,7 @@ func loadTestLockedSource(s *lockedSource, n int) { } func TestLockedSourceForConcurrentUse(t *testing.T) { - s := newLockedSource() + s := NewLockedSource() var wg sync.WaitGroup for i := 0; i < 10; i++ { diff --git a/proxy/fadein.go b/proxy/fadein.go new file mode 100644 index 0000000000..7549c722e4 --- /dev/null +++ b/proxy/fadein.go @@ -0,0 +1,53 @@ +package proxy + +import ( + "math" + "math/rand" + "time" + + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/routing" +) + +func returnFast(rt *routing.Route) bool { + if rt.BackendType != eskip.LBBackend { + return true + } + + return rt.LBFadeInDuration <= 0 +} + +func fadeIn(lifetime time.Duration, duration time.Duration, exponent float64) float64 { + fadingIn := lifetime > 0 && lifetime < duration + if !fadingIn { + return 1 + } + + return math.Pow(float64(lifetime)/float64(duration), exponent) +} + +func filterFadeIn(endpoints []routing.LBEndpoint, rt *routing.Route, registry *routing.EndpointRegistry, rnd *rand.Rand) []routing.LBEndpoint { + if returnFast(rt) { + return endpoints + } + + now := time.Now() + threshold := rnd.Float64() + + filtered := make([]routing.LBEndpoint, 0, len(endpoints)) + for _, e := range endpoints { + f := fadeIn( + now.Sub(registry.GetMetrics(e.Host).DetectedTime()), + rt.LBFadeInDuration, + rt.LBFadeInExponent, + ) + if threshold < f { + filtered = append(filtered, e) + } + } + + if len(filtered) == 0 { + return endpoints + } + return filtered +} diff --git a/proxy/fadein_internal_test.go b/proxy/fadein_internal_test.go new file mode 100644 index 0000000000..a18e79e08d --- /dev/null +++ b/proxy/fadein_internal_test.go @@ -0,0 +1,414 @@ +package proxy + +import ( + "fmt" + "math/rand" + "net/http" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/loadbalancer" + "github.com/zalando/skipper/routing" +) + +const ( + fadeInRequestCount = 300_000 + bucketCount = 20 + monotonyTolerance = 0.4 // we need to use a high tolerance for CI testing + + defaultFadeInDuration = 500 * time.Millisecond + defaultFadeInDurationHuge = 24 * time.Hour // we need this to be sure we're at the very beginning of fading in +) + +func absint(i int) int { + if i < 0 { + return -i + } + + return i +} + +func tolerance(prev, next int) int { + return int(float64(prev+next) * monotonyTolerance / 2) +} + +func checkMonotony(direction, prev, next int) bool { + t := tolerance(prev, next) + switch direction { + case 1: + return next-prev >= -t + case -1: + return next-prev <= t + default: + return absint(next-prev) < t + } +} + +func multiply(k float64, d time.Duration) time.Duration { + return time.Duration(k * float64(d)) +} + +func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDuration time.Duration) (*routing.Route, *Proxy, []string) { + var detectionTimes []time.Time + now := time.Now() + for _, ea := range endpointAges { + endpointAgeDuration := multiply(ea, fadeInDuration) + detectionTimes = append(detectionTimes, now.Add(-endpointAgeDuration)) + } + + var eps []string + for i, ea := range endpointAges { + endpointAgeDuration := multiply(ea, fadeInDuration) + eps = append(eps, fmt.Sprintf("http://ep-%d-%s.test", i, endpointAgeDuration)) + } + + registry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + eskipRoute := eskip.Route{BackendType: eskip.LBBackend, LBAlgorithm: algorithmName} + for i := range eps { + eskipRoute.LBEndpoints = append(eskipRoute.LBEndpoints, eps[i]) + registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) + } + + route := &routing.Route{ + Route: eskipRoute, + LBFadeInDuration: fadeInDuration, + LBFadeInExponent: 1, + LBEndpoints: []routing.LBEndpoint{}, + } + + rt := loadbalancer.NewAlgorithmProvider().Do([]*routing.Route{route}) + route = rt[0] + registry.Do([]*routing.Route{route}) + + eps = []string{} + for i, ea := range endpointAges { + endpointAgeDuration := multiply(ea, fadeInDuration) + eps = append(eps, fmt.Sprintf("ep-%d-%s.test:80", i, endpointAgeDuration)) + registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i]) + } + + proxy := &Proxy{registry: registry, rnd: rand.New(loadbalancer.NewLockedSource())} + return route, proxy, eps +} + +// This function is needed to calculate the duration needed to send fadeInRequestCount requests. +// It is needed to send number of requests close to fadeInRequestCount on one hand and to have them sent exactly +// in the fade-in duration returned on the other hand. +func calculateFadeInDuration(t *testing.T, algorithmName string, endpointAges []float64) time.Duration { + const precalculateRatio = 10 + + route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDuration) + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + t.Log("preemulation start", time.Now()) + // Preemulate the load balancer loop to find out the approximate amount of RPS + begin := time.Now() + for i := 0; i < fadeInRequestCount/precalculateRatio; i++ { + _ = proxy.selectEndpoint(&context{route: route, request: &http.Request{}, stateBag: map[string]interface{}{loadbalancer.ConsistentHashKey: strconv.Itoa(rnd.Intn(100000))}}) + } + preemulationDuration := time.Since(begin) + + return preemulationDuration * precalculateRatio +} + +func testFadeInMonotony( + t *testing.T, + name string, + algorithmName string, + endpointAges ...float64, +) { + t.Run(name, func(t *testing.T) { + fadeInDuration := calculateFadeInDuration(t, algorithmName, endpointAges) + route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, fadeInDuration) + + t.Log("test start", time.Now()) + var stats []string + stop := time.After(fadeInDuration) + // Emulate the load balancer loop, sending requests to it with random hash keys + // over and over again till fadeIn period is over. + func() { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + for { + ep := proxy.selectEndpoint(&context{route: route, request: &http.Request{}, stateBag: map[string]interface{}{loadbalancer.ConsistentHashKey: strconv.Itoa(rnd.Intn(100000))}}) + stats = append(stats, ep.Host) + select { + case <-stop: + return + default: + } + } + }() + + // Split fade-in period into buckets and count how many times each endpoint was selected. + t.Log("test done", time.Now()) + t.Log("CSV timestamp," + strings.Join(eps, ",")) + bucketSize := len(stats) / bucketCount + var allBuckets []map[string]int + for i := 0; i < bucketCount; i++ { + bucketStats := make(map[string]int) + for j := i * bucketSize; j < (i+1)*bucketSize; j++ { + bucketStats[stats[j]]++ + } + + allBuckets = append(allBuckets, bucketStats) + } + + directions := make(map[string]int) + for _, epi := range eps { + first := allBuckets[0][epi] + last := allBuckets[len(allBuckets)-1][epi] + t := tolerance(first, last) + switch { + case last-first > t: + directions[epi] = 1 + case last-first < t: + directions[epi] = -1 + } + } + + for i := range allBuckets { + // trim first and last (warmup and settling) + if i < 2 || i == len(allBuckets)-1 { + continue + } + + for _, epi := range eps { + if !checkMonotony( + directions[epi], + allBuckets[i-1][epi], + allBuckets[i][epi], + ) { + t.Error("non-monotonic change", epi, i) + } + } + } + + for i, bucketStats := range allBuckets { + var showStats []string + for _, epi := range eps { + showStats = append(showStats, fmt.Sprintf("%d", bucketStats[epi])) + } + + // Print CSV-like output for, where row number represents time and + // column represents endpoint. You can visualize it using + // ./skptesting/run_fadein_test.sh from the skipper repo root. + t.Log("CSV " + fmt.Sprintf("%d,", i) + strings.Join(showStats, ",")) + } + }) +} + +// Those tests check that the amount of requests per period for each endpoint is monotonical over the time. +// For every endpoint, it could increase, decrease or stay the same. +func TestFadeInMonotony(t *testing.T) { + old := 2.0 + testFadeInMonotony(t, "power-of-n-random-choices, 0", loadbalancer.PowerOfRandomNChoices.String(), old, old) + testFadeInMonotony(t, "power-of-n-random-choices, 1", loadbalancer.PowerOfRandomNChoices.String(), 0, old) + testFadeInMonotony(t, "power-of-n-random-choices, 2", loadbalancer.PowerOfRandomNChoices.String(), 0, 0) + testFadeInMonotony(t, "power-of-n-random-choices, 3", loadbalancer.PowerOfRandomNChoices.String(), old, 0) + testFadeInMonotony(t, "power-of-n-random-choices, 4", loadbalancer.PowerOfRandomNChoices.String(), old, old, old, 0) + testFadeInMonotony(t, "power-of-n-random-choices, 5", loadbalancer.PowerOfRandomNChoices.String(), old, old, old, 0, 0, 0) + testFadeInMonotony(t, "power-of-n-random-choices, 6", loadbalancer.PowerOfRandomNChoices.String(), old, 0, 0, 0) + testFadeInMonotony(t, "power-of-n-random-choices, 7", loadbalancer.PowerOfRandomNChoices.String(), old, 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "power-of-n-random-choices, 8", loadbalancer.PowerOfRandomNChoices.String(), 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "power-of-n-random-choices, 9", loadbalancer.PowerOfRandomNChoices.String(), 1.0/2.0, 1.0/3.0, 1.0/4.0) + + testFadeInMonotony(t, "round-robin, 0", loadbalancer.RoundRobin.String(), old, old) + testFadeInMonotony(t, "round-robin, 1", loadbalancer.RoundRobin.String(), 0, old) + testFadeInMonotony(t, "round-robin, 2", loadbalancer.RoundRobin.String(), 0, 0) + testFadeInMonotony(t, "round-robin, 3", loadbalancer.RoundRobin.String(), old, 0) + testFadeInMonotony(t, "round-robin, 4", loadbalancer.RoundRobin.String(), old, old, old, 0) + testFadeInMonotony(t, "round-robin, 5", loadbalancer.RoundRobin.String(), old, old, old, 0, 0, 0) + testFadeInMonotony(t, "round-robin, 6", loadbalancer.RoundRobin.String(), old, 0, 0, 0) + testFadeInMonotony(t, "round-robin, 7", loadbalancer.RoundRobin.String(), old, 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "round-robin, 8", loadbalancer.RoundRobin.String(), 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "round-robin, 9", loadbalancer.RoundRobin.String(), 1.0/2.0, 1.0/3.0, 1.0/4.0) + + testFadeInMonotony(t, "random, 0", loadbalancer.Random.String(), old, old) + testFadeInMonotony(t, "random, 1", loadbalancer.Random.String(), 0, old) + testFadeInMonotony(t, "random, 2", loadbalancer.Random.String(), 0, 0) + testFadeInMonotony(t, "random, 3", loadbalancer.Random.String(), old, 0) + testFadeInMonotony(t, "random, 4", loadbalancer.Random.String(), old, old, old, 0) + testFadeInMonotony(t, "random, 5", loadbalancer.Random.String(), old, old, old, 0, 0, 0) + testFadeInMonotony(t, "random, 6", loadbalancer.Random.String(), old, 0, 0, 0) + testFadeInMonotony(t, "random, 7", loadbalancer.Random.String(), old, 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "random, 8", loadbalancer.Random.String(), 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "random, 9", loadbalancer.Random.String(), 1.0/2.0, 1.0/3.0, 1.0/4.0) + + testFadeInMonotony(t, "consistent-hash, 0", loadbalancer.ConsistentHash.String(), old, old) + testFadeInMonotony(t, "consistent-hash, 1", loadbalancer.ConsistentHash.String(), 0, old) + testFadeInMonotony(t, "consistent-hash, 2", loadbalancer.ConsistentHash.String(), 0, 0) + testFadeInMonotony(t, "consistent-hash, 3", loadbalancer.ConsistentHash.String(), old, 0) + testFadeInMonotony(t, "consistent-hash, 4", loadbalancer.ConsistentHash.String(), old, old, old, 0) + testFadeInMonotony(t, "consistent-hash, 5", loadbalancer.ConsistentHash.String(), old, old, old, 0, 0, 0) + testFadeInMonotony(t, "consistent-hash, 6", loadbalancer.ConsistentHash.String(), old, 0, 0, 0) + testFadeInMonotony(t, "consistent-hash, 7", loadbalancer.ConsistentHash.String(), old, 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "consistent-hash, 8", loadbalancer.ConsistentHash.String(), 0, 0, 0, 0, 0, 0) + testFadeInMonotony(t, "consistent-hash, 9", loadbalancer.ConsistentHash.String(), 1.0/2.0, 1.0/3.0, 1.0/4.0) +} + +func testFadeInLoadBetweenOldAndNewEps( + t *testing.T, + name string, + algorithmName string, + nOld int, nNew int, +) { + t.Run(name, func(t *testing.T) { + const ( + numberOfReqs = 100000 + acceptableErrorNearZero = 10 + old = 1.0 + new = 0.0 + ) + endpointAges := []float64{} + for i := 0; i < nOld; i++ { + endpointAges = append(endpointAges, old) + } + for i := 0; i < nNew; i++ { + endpointAges = append(endpointAges, new) + } + + route, proxy, eps := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge) + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + nReqs := map[string]int{} + + t.Log("test start", time.Now()) + // Emulate the load balancer loop, sending requests to it with random hash keys + // over and over again till fadeIn period is over. + for i := 0; i < numberOfReqs; i++ { + ep := proxy.selectEndpoint(&context{route: route, request: &http.Request{}, stateBag: map[string]interface{}{loadbalancer.ConsistentHashKey: strconv.Itoa(rnd.Intn(100000))}}) + nReqs[ep.Host]++ + } + + if nOld == 0 { + expectedReqsPerEndpoint := numberOfReqs / nNew + for _, ep := range eps { + assert.InEpsilon(t, expectedReqsPerEndpoint, nReqs[ep], 0.2) + } + } else { + expectedReqsPerOldEndpoint := numberOfReqs / nOld + for idx, ep := range eps { + if endpointAges[idx] == old { + assert.InEpsilon(t, expectedReqsPerOldEndpoint, nReqs[ep], 0.2) + } + if endpointAges[idx] == new { + assert.InDelta(t, 0, nReqs[ep], acceptableErrorNearZero) + } + } + } + }) +} + +// Those tests check that the amount of requests per period for every endpoint at the very beginning of fading in (when all endpoints are new) +// and at the very end of fading in (when all endpoints are old) is correct. +func TestFadeInLoadBetweenOldAndNewEps(t *testing.T) { + for nOld := 0; nOld < 6; nOld++ { + for nNew := 0; nNew < 6; nNew++ { + if nOld == 0 && nNew == 0 { + continue + } + + testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("power-of-n-random-choices, %d old, %d new", nOld, nNew), loadbalancer.PowerOfRandomNChoices.String(), nOld, nNew) + testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("consistent-hash, %d old, %d new", nOld, nNew), loadbalancer.ConsistentHash.String(), nOld, nNew) + testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("random, %d old, %d new", nOld, nNew), loadbalancer.Random.String(), nOld, nNew) + testFadeInLoadBetweenOldAndNewEps(t, fmt.Sprintf("round-robin, %d old, %d new", nOld, nNew), loadbalancer.RoundRobin.String(), nOld, nNew) + } + } +} + +func testSelectEndpointEndsWhenAllEndpointsAreFading( + t *testing.T, + name string, + algorithmName string, + nEndpoints int, +) { + t.Run(name, func(t *testing.T) { + // Initialize every endpoint with zero: every endpoint is new + endpointAges := make([]float64, nEndpoints) + route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge) + applied := make(chan struct{}) + + go func() { + proxy.selectEndpoint(&context{route: route, request: &http.Request{}, stateBag: map[string]interface{}{loadbalancer.ConsistentHashKey: "someConstantString"}}) + close(applied) + }() + + select { + case <-time.After(time.Second): + t.Errorf("a.Apply has not finished in a reasonable time") + case <-applied: + break + } + }) +} + +func TestSelectEndpointEndsWhenAllEndpointsAreFading(t *testing.T) { + for nEndpoints := 1; nEndpoints < 10; nEndpoints++ { + testSelectEndpointEndsWhenAllEndpointsAreFading(t, "power-of-n-random-choices", loadbalancer.PowerOfRandomNChoices.String(), nEndpoints) + testSelectEndpointEndsWhenAllEndpointsAreFading(t, "consistent-hash", loadbalancer.ConsistentHash.String(), nEndpoints) + testSelectEndpointEndsWhenAllEndpointsAreFading(t, "random", loadbalancer.Random.String(), nEndpoints) + testSelectEndpointEndsWhenAllEndpointsAreFading(t, "round-robin", loadbalancer.RoundRobin.String(), nEndpoints) + } +} + +func benchmarkFadeIn( + b *testing.B, + name string, + algorithmName string, + clients int, + endpointAges ...float64, +) { + b.Run(name, func(b *testing.B) { + route, proxy, _ := initializeEndpoints(endpointAges, algorithmName, defaultFadeInDurationHuge) + var wg sync.WaitGroup + + // Emulate the load balancer loop, sending requests to it with random hash keys + // over and over again till fadeIn period is over. + b.ResetTimer() + for i := 0; i < clients; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + for j := 0; j < b.N/clients; j++ { + _ = proxy.selectEndpoint(&context{route: route, request: &http.Request{}, stateBag: map[string]interface{}{loadbalancer.ConsistentHashKey: strconv.Itoa(rnd.Intn(100000))}}) + } + }(i) + } + + wg.Wait() + }) +} + +func repeatedSlice(v float64, n int) []float64 { + var s []float64 + for i := 0; i < n; i++ { + s = append(s, v) + } + return s +} + +func BenchmarkFadeIn(b *testing.B) { + old := 2.0 + clients := []int{1, 4, 16, 64, 256} + for _, c := range clients { + benchmarkFadeIn(b, fmt.Sprintf("power-of-n-random-choices, 11, %d clients", c), loadbalancer.PowerOfRandomNChoices.String(), c, repeatedSlice(old, 200)...) + } + + for _, c := range clients { + benchmarkFadeIn(b, fmt.Sprintf("random, 11, %d clients", c), loadbalancer.Random.String(), c, repeatedSlice(old, 200)...) + } + + for _, c := range clients { + benchmarkFadeIn(b, fmt.Sprintf("round-robin, 11, %d clients", c), loadbalancer.RoundRobin.String(), c, repeatedSlice(old, 200)...) + } + + for _, c := range clients { + benchmarkFadeIn(b, fmt.Sprintf("consistent-hash, 11, %d clients", c), loadbalancer.ConsistentHash.String(), c, repeatedSlice(old, 200)...) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 45b19812be..b885e50540 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net" "net/http" "net/http/httptrace" @@ -328,6 +329,7 @@ type Proxy struct { defaultHTTPStatus int routing *routing.Routing registry *routing.EndpointRegistry + rnd *rand.Rand roundTripper http.RoundTripper priorityRoutes []PriorityRoute flags Flags @@ -467,16 +469,19 @@ func setRequestURLForDynamicBackend(u *url.URL, stateBag map[string]interface{}) } } -func selectEndpoint(ctx *context, registry *routing.EndpointRegistry) *routing.LBEndpoint { +func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint { rt := ctx.route + endpoints := rt.LBEndpoints + endpoints = filterFadeIn(endpoints, rt, p.registry, p.rnd) lbctx := &routing.LBContext{ Request: ctx.request, Route: rt, - LBEndpoints: rt.LBEndpoints, + LBEndpoints: endpoints, Params: ctx.StateBag(), - Registry: registry, + Registry: p.registry, } + e := rt.LBAlgorithm.Apply(lbctx) return &e @@ -484,7 +489,7 @@ func selectEndpoint(ctx *context, registry *routing.EndpointRegistry) *routing.L // creates an outgoing http request to be forwarded to the route endpoint // based on the augmented incoming request -func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHeaders bool, registry *routing.EndpointRegistry) (*http.Request, routing.Metrics, error) { +func (p *Proxy) mapRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Request, routing.Metrics, error) { var endpointMetrics routing.Metrics r := ctx.request rt := ctx.route @@ -497,12 +502,12 @@ func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHea setRequestURLFromRequest(u, r) setRequestURLForDynamicBackend(u, stateBag) case eskip.LBBackend: - endpoint := selectEndpoint(ctx, registry) + endpoint := p.selectEndpoint(ctx) endpointMetrics = endpoint.Metrics u.Scheme = endpoint.Scheme u.Host = endpoint.Host case eskip.NetworkBackend: - endpointMetrics = registry.GetMetrics(rt.Host) + endpointMetrics = p.registry.GetMetrics(rt.Host) fallthrough default: u.Scheme = rt.Scheme @@ -520,7 +525,7 @@ func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHea } rr.ContentLength = r.ContentLength - if removeHopHeaders { + if p.flags.HopHeadersRemoval() { rr.Header = cloneHeaderExcluding(r.Header, hopHeaders) } else { rr.Header = cloneHeader(r.Header) @@ -745,6 +750,8 @@ func WithParams(p Params) *Proxy { clientTLS: tr.TLSClientConfig, hostname: hostname, onPanicSometimes: rate.Sometimes{First: 3, Interval: 1 * time.Minute}, + /* #nosec */ + rnd: rand.New(loadbalancer.NewLockedSource()), } } @@ -840,7 +847,7 @@ func (p *Proxy) makeUpgradeRequest(ctx *context, req *http.Request) { } func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Response, *proxyError) { - req, endpointMetrics, err := mapRequest(ctx, requestContext, p.flags.HopHeadersRemoval(), p.registry) + req, endpointMetrics, err := p.mapRequest(ctx, requestContext) if err != nil { return nil, &proxyError{err: fmt.Errorf("could not map backend request: %w", err)} } @@ -1112,7 +1119,7 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) { ctx.setResponse(loopCTX.response, p.flags.PreserveOriginal()) ctx.proxySpan = loopCTX.proxySpan } else if p.flags.Debug() { - debugReq, _, err := mapRequest(ctx, ctx.request.Context(), p.flags.HopHeadersRemoval(), p.registry) + debugReq, _, err := p.mapRequest(ctx, ctx.request.Context()) if err != nil { perr := &proxyError{err: err} p.makeErrorResponse(ctx, perr) diff --git a/skptesting/run_fadein_test.sh b/skptesting/run_fadein_test.sh index 43ee2fc7ed..88d970188e 100755 --- a/skptesting/run_fadein_test.sh +++ b/skptesting/run_fadein_test.sh @@ -1,7 +1,7 @@ #!/bin/bash function run_test() { - go test ./loadbalancer -run="$1" -count=1 -v | awk '/fadein_test.go:[0-9]+: CSV/ {print $3}' + go test ./proxy -run="$1" -count=1 -v | awk '/fadein_internal_test.go:[0-9]+: CSV/ {print $3}' } cwd=$( dirname "${BASH_SOURCE[0]}" ) @@ -10,7 +10,7 @@ if [ -z "${1+x}" ] then echo "$0 [...]" echo "Example:" - echo "$0 TestFadeIn/round-robin,_4 TestFadeIn/round-robin,_3" + echo "$0 TestFadeInMonotony/round-robin,_4 TestFadeInMonotony/round-robin,_3" else d=$(mktemp -d) for t in "$@"