diff --git a/loadbalancer/algorithm.go b/loadbalancer/algorithm.go index 6972239e18..e3d33a72fe 100644 --- a/loadbalancer/algorithm.go +++ b/loadbalancer/algorithm.go @@ -3,7 +3,6 @@ package loadbalancer import ( "errors" "fmt" - "math" "math/rand" "net" "net/url" @@ -11,7 +10,6 @@ import ( "strings" "sync" "sync/atomic" - "time" "github.com/cespare/xxhash/v2" log "github.com/sirupsen/logrus" @@ -56,107 +54,13 @@ var ( defaultAlgorithm = newRoundRobin ) -func fadeInState(now time.Time, duration time.Duration, detected time.Time) (time.Duration, bool) { - rel := now.Sub(detected) - return rel, rel > 0 && rel < duration -} - -func fadeIn(now time.Time, duration time.Duration, exponent float64, detected time.Time) float64 { - rel, fadingIn := fadeInState(now, duration, detected) - if !fadingIn { - return 1 - } - - return math.Pow(float64(rel)/float64(duration), exponent) -} - -func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, now time.Time) routing.LBEndpoint { - var sum float64 - rt := ctx.Route - ep := ctx.Route.LBEndpoints - for _, epi := range ep { - detected := ctx.Registry.GetMetrics(epi.Host).DetectedTime() - wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, detected) - sum += wi - } - - choice := ep[len(ep)-1] - r := rnd.Float64() * sum - var upto float64 - for i, epi := range ep { - detected := ctx.Registry.GetMetrics(epi.Host).DetectedTime() - upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, detected) - if upto > r { - choice = ep[i] - break - } - } - - return choice -} - -func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, now time.Time) routing.LBEndpoint { - notFadingIndexes := wi - ep := ctx.Route.LBEndpoints - - // if all endpoints are fading, the simplest approach is to use the oldest, - // this departs from the desired curve, but guarantees monotonic fade-in. From - // the perspective of the oldest endpoint, this is temporarily the same as if - // there was no fade-in. - if len(notFadingIndexes) == 0 { - return shiftWeighted(rnd, ctx, now) - } - - // otherwise equally distribute between the old endpoints - return ep[notFadingIndexes[rnd.Intn(len(notFadingIndexes))]] -} - -func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, choice int, algo routing.LBAlgorithm) routing.LBEndpoint { - ep := ctx.Route.LBEndpoints - now := time.Now() - detected := ctx.Registry.GetMetrics(ctx.Route.LBEndpoints[choice].Host).DetectedTime() - f := fadeIn( - now, - ctx.Route.LBFadeInDuration, - ctx.Route.LBFadeInExponent, - detected, - ) - - if rnd.Float64() < f { - return ep[choice] - } - notFadingIndexes := make([]int, 0, len(ep)) - for i := 0; i < len(ep); i++ { - detected := ctx.Registry.GetMetrics(ep[i].Host).DetectedTime() - if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, detected); !fadingIn { - notFadingIndexes = append(notFadingIndexes, i) - } - } - - switch a := algo.(type) { - case *roundRobin: - return shiftToRemaining(a.rnd, ctx, notFadingIndexes, now) - case *random: - return shiftToRemaining(a.rnd, ctx, notFadingIndexes, now) - case *consistentHash: - // If all endpoints are fading, normal consistent hash result - if len(notFadingIndexes) == 0 { - return ep[choice] - } - // otherwise calculate consistent hash again using endpoints which are not fading - return ep[a.chooseConsistentHashEndpoint(ctx, skipFadingEndpoints(notFadingIndexes))] - default: - return ep[choice] - } -} - type roundRobin struct { index int64 rnd *rand.Rand } func newRoundRobin(endpoints []string) routing.LBAlgorithm { - rnd := rand.New(newLockedSource()) // #nosec + rnd := rand.New(routing.NewLockedSource()) // #nosec return &roundRobin{ index: int64(rnd.Intn(len(endpoints))), rnd: rnd, @@ -165,17 +69,12 @@ func newRoundRobin(endpoints []string) routing.LBAlgorithm { // Apply implements routing.LBAlgorithm with a roundrobin algorithm. func (r *roundRobin) Apply(ctx *routing.LBContext) routing.LBEndpoint { - if len(ctx.Route.LBEndpoints) == 1 { - return ctx.Route.LBEndpoints[0] + if len(ctx.Endpoints) == 1 { + return ctx.Endpoints[0] } - index := int(atomic.AddInt64(&r.index, 1) % int64(len(ctx.Route.LBEndpoints))) - - if ctx.Route.LBFadeInDuration <= 0 { - return ctx.Route.LBEndpoints[index] - } - - return withFadeIn(r.rnd, ctx, index, r) + choice := int(atomic.AddInt64(&r.index, 1) % int64(len(ctx.Endpoints))) + return ctx.Endpoints[choice] } type random struct { @@ -185,22 +84,18 @@ type random struct { func newRandom(endpoints []string) routing.LBAlgorithm { // #nosec return &random{ - rnd: rand.New(newLockedSource()), + rnd: rand.New(routing.NewLockedSource()), } } // Apply implements routing.LBAlgorithm with a stateless random algorithm. func (r *random) Apply(ctx *routing.LBContext) routing.LBEndpoint { - if len(ctx.Route.LBEndpoints) == 1 { - return ctx.Route.LBEndpoints[0] + if len(ctx.Endpoints) == 1 { + return ctx.Endpoints[0] } - i := r.rnd.Intn(len(ctx.Route.LBEndpoints)) - if ctx.Route.LBFadeInDuration <= 0 { - return ctx.Route.LBEndpoints[i] - } - - return withFadeIn(r.rnd, ctx, i, r) + choice := r.rnd.Intn(len(ctx.Endpoints)) + return ctx.Endpoints[choice] } type ( @@ -221,7 +116,7 @@ func (ch *consistentHash) Swap(i, j int) { } func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routing.LBAlgorithm { - rnd := rand.New(newLockedSource()) // #nosec + rnd := rand.New(routing.NewLockedSource()) // #nosec ch := &consistentHash{ hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)), rnd: rnd, @@ -245,27 +140,24 @@ func hash(s string) uint64 { } // Returns index in hash ring with the closest hash to key's hash -func (ch *consistentHash) searchRing(key string, skipEndpoint func(int) bool) int { +func (ch *consistentHash) searchRing(key string) int { h := hash(key) i := sort.Search(ch.Len(), func(i int) bool { return ch.hashRing[i].hash >= h }) if i == ch.Len() { // rollover i = 0 } - for skipEndpoint(ch.hashRing[i].index) { - i = (i + 1) % ch.Len() - } return i } // Returns index of endpoint with closest hash to key's hash -func (ch *consistentHash) search(key string, skipEndpoint func(int) bool) int { - ringIndex := ch.searchRing(key, skipEndpoint) +func (ch *consistentHash) search(key string) int { + ringIndex := ch.searchRing(key) return ch.hashRing[ringIndex].index } func computeLoadAverage(ctx *routing.LBContext) float64 { sum := 1.0 // add 1 to include the request that just arrived - endpoints := ctx.Route.LBEndpoints + endpoints := ctx.Endpoints for _, v := range endpoints { sum += float64(v.Metrics.GetInflightRequests()) } @@ -274,17 +166,14 @@ func computeLoadAverage(ctx *routing.LBContext) float64 { // Returns index of endpoint with closest hash to key's hash, which is also below the target load // skipEndpoint function is used to skip endpoints we don't want, such as fading endpoints -func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, ctx *routing.LBContext, skipEndpoint func(int) bool) int { - ringIndex := ch.searchRing(key, skipEndpoint) +func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, ctx *routing.LBContext) int { + ringIndex := ch.searchRing(key) averageLoad := computeLoadAverage(ctx) targetLoad := averageLoad * balanceFactor // Loop round ring, starting at endpoint with closest hash. Stop when we find one whose load is less than targetLoad. for i := 0; i < ch.Len(); i++ { endpointIndex := ch.hashRing[ringIndex].index - if skipEndpoint(endpointIndex) { - continue - } - load := ctx.Route.LBEndpoints[endpointIndex].Metrics.GetInflightRequests() + load := ctx.Endpoints[endpointIndex].Metrics.GetInflightRequests() // We know there must be an endpoint whose load <= average load. // Since targetLoad >= average load (balancerFactor >= 1), there must also be an endpoint with load <= targetLoad. if load <= int(targetLoad) { @@ -298,20 +187,15 @@ func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, c // Apply implements routing.LBAlgorithm with a consistent hash algorithm. func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint { - if len(ctx.Route.LBEndpoints) == 1 { - return ctx.Route.LBEndpoints[0] - } - - choice := ch.chooseConsistentHashEndpoint(ctx, noSkippedEndpoints) - - if ctx.Route.LBFadeInDuration <= 0 { - return ctx.Route.LBEndpoints[choice] + if len(ctx.Endpoints) == 1 { + return ctx.Endpoints[0] } - return withFadeIn(ch.rnd, ctx, choice, ch) + choice := ch.chooseConsistentHashEndpoint(ctx) + return ctx.Endpoints[choice] } -func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, skipEndpoint func(int) bool) int { +func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext) int { key, ok := ctx.Params[ConsistentHashKey].(string) if !ok { key = snet.RemoteHost(ctx.Request).String() @@ -319,29 +203,14 @@ func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, s balanceFactor, ok := ctx.Params[ConsistentHashBalanceFactor].(float64) var choice int if !ok { - choice = ch.search(key, skipEndpoint) + choice = ch.search(key) } else { - choice = ch.boundedLoadSearch(key, balanceFactor, ctx, skipEndpoint) + choice = ch.boundedLoadSearch(key, balanceFactor, ctx) } return choice } -func skipFadingEndpoints(notFadingEndpoints []int) func(int) bool { - return func(i int) bool { - for _, notFadingEndpoint := range notFadingEndpoints { - if i == notFadingEndpoint { - return false - } - } - return true - } -} - -func noSkippedEndpoints(_ int) bool { - return false -} - type powerOfRandomNChoices struct { mx sync.Mutex rnd *rand.Rand @@ -350,7 +219,7 @@ type powerOfRandomNChoices struct { // newPowerOfRandomNChoices selects N random backends and picks the one with less outstanding requests. func newPowerOfRandomNChoices([]string) routing.LBAlgorithm { - rnd := rand.New(newLockedSource()) // #nosec + rnd := rand.New(routing.NewLockedSource()) // #nosec return &powerOfRandomNChoices{ rnd: rnd, numberOfChoices: powerOfRandomNChoicesDefaultN, @@ -359,15 +228,15 @@ func newPowerOfRandomNChoices([]string) routing.LBAlgorithm { // Apply implements routing.LBAlgorithm with power of random N choices algorithm. func (p *powerOfRandomNChoices) Apply(ctx *routing.LBContext) routing.LBEndpoint { - ne := len(ctx.Route.LBEndpoints) + ne := len(ctx.Endpoints) p.mx.Lock() defer p.mx.Unlock() - best := ctx.Route.LBEndpoints[p.rnd.Intn(ne)] + best := ctx.Endpoints[p.rnd.Intn(ne)] for i := 1; i < p.numberOfChoices; i++ { - ce := ctx.Route.LBEndpoints[p.rnd.Intn(ne)] + ce := ctx.Endpoints[p.rnd.Intn(ne)] if p.getScore(ce) > p.getScore(best) { best = ce diff --git a/loadbalancer/algorithm_test.go b/loadbalancer/algorithm_test.go index ca684ebc71..57b4b873ea 100644 --- a/loadbalancer/algorithm_test.go +++ b/loadbalancer/algorithm_test.go @@ -258,9 +258,9 @@ func TestApply(t *testing.T) { rt := p.Do([]*routing.Route{r}) lbctx := &routing.LBContext{ - Request: req, - Route: rt[0], - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), + Request: req, + Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), + Endpoints: rt[0].LBEndpoints, } h := make(map[string]int) @@ -279,7 +279,7 @@ func TestApply(t *testing.T) { func TestConsistentHashSearch(t *testing.T) { apply := func(key string, endpoints []string) string { ch := newConsistentHash(endpoints).(*consistentHash) - return endpoints[ch.search(key, noSkippedEndpoints)] + return endpoints[ch.search(key)] } endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"} @@ -316,13 +316,13 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) { }})[0] ch := route.LBAlgorithm.(*consistentHash) ctx := &routing.LBContext{ - Request: r, - Route: route, - Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25}, - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), + Request: r, + Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25}, + Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), + Endpoints: route.LBEndpoints, } noLoad := ch.Apply(ctx) - nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, Params: map[string]interface{}{}}) + nonBounded := ch.Apply(&routing.LBContext{Request: r, Params: map[string]interface{}{}, Endpoints: route.LBEndpoints}) if noLoad != nonBounded { t.Error("When no endpoints are overloaded, the chosen endpoint should be the same as standard consistentHash") @@ -364,8 +364,8 @@ func TestConsistentHashKey(t *testing.T) { }, }})[0] - defaultEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, Params: make(map[string]interface{})}) - remoteHostEndpoint := ch.Apply(&routing.LBContext{Request: r, Route: rt, Params: map[string]interface{}{ConsistentHashKey: net.RemoteHost(r).String()}}) + defaultEndpoint := ch.Apply(&routing.LBContext{Request: r, Params: make(map[string]interface{}), Endpoints: rt.LBEndpoints}) + remoteHostEndpoint := ch.Apply(&routing.LBContext{Request: r, Params: map[string]interface{}{ConsistentHashKey: net.RemoteHost(r).String()}, Endpoints: rt.LBEndpoints}) if defaultEndpoint != remoteHostEndpoint { t.Error("remote host should be used as a default key") @@ -373,7 +373,7 @@ func TestConsistentHashKey(t *testing.T) { for i, ep := range endpoints { key := fmt.Sprintf("%s-%d", ep, 1) // "ep-0" to "ep-99" is the range of keys for this endpoint. If we use this as the hash key it should select endpoint ep. - selected := ch.Apply(&routing.LBContext{Request: r, Route: rt, Params: map[string]interface{}{ConsistentHashKey: key}}) + selected := ch.Apply(&routing.LBContext{Request: r, Params: map[string]interface{}{ConsistentHashKey: key}, Endpoints: rt.LBEndpoints}) if selected != rt.LBEndpoints[i] { t.Errorf("expected: %v, got %v", rt.LBEndpoints[i], selected) } @@ -393,10 +393,10 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) { ch := route.LBAlgorithm.(*consistentHash) balanceFactor := 1.25 ctx := &routing.LBContext{ - Request: r, - Route: route, - Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor}, - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), + Request: r, + Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor}, + Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), + Endpoints: route.LBEndpoints, } for i := 0; i < 100; i++ { diff --git a/loadbalancer/fadein_test.go b/loadbalancer/fadein_test.go deleted file mode 100644 index 4ba3dc4930..0000000000 --- a/loadbalancer/fadein_test.go +++ /dev/null @@ -1,387 +0,0 @@ -package loadbalancer - -import ( - "fmt" - "math/rand" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/zalando/skipper/routing" -) - -const ( - fadeInDuration = 500 * time.Millisecond - fadeInDurationHuge = 24 * time.Hour // we need this to be sure we're at the very beginning of fading in - bucketCount = 20 - monotonyTolerance = 0.4 // we need to use a high tolerance for CI testing -) - -func absint(i int) int { - if i < 0 { - return -i - } - - return i -} - -func tolerance(prev, next int) int { - return int(float64(prev+next) * monotonyTolerance / 2) -} - -func checkMonotony(direction, prev, next int) bool { - t := tolerance(prev, next) - switch direction { - case 1: - return next-prev >= -t - case -1: - return next-prev <= t - default: - return absint(next-prev) < t - } -} - -func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Duration) (*routing.LBContext, []string) { - var detectionTimes []time.Time - now := time.Now() - for _, ea := range endpointAges { - detectionTimes = append(detectionTimes, now.Add(-ea)) - } - - var eps []string - for i := range endpointAges { - eps = append(eps, fmt.Sprintf("ep-%d-%s.test", i, endpointAges[i])) - } - - ctx := &routing.LBContext{ - Params: map[string]interface{}{}, - Route: &routing.Route{ - LBFadeInDuration: fadeInDuration, - LBFadeInExponent: 1, - }, - Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}), - } - - for i := range eps { - ctx.Route.LBEndpoints = append(ctx.Route.LBEndpoints, routing.LBEndpoint{ - Host: eps[i], - Detected: detectionTimes[i], - }) - ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i]) - } - - return ctx, eps -} - -func testFadeIn( - t *testing.T, - name string, - algorithm func([]string) routing.LBAlgorithm, - endpointAges ...time.Duration, -) { - t.Run(name, func(t *testing.T) { - ctx, eps := initializeEndpoints(endpointAges, fadeInDuration) - - a := algorithm(eps) - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - t.Log("test start", time.Now()) - var stats []string - stop := time.After(fadeInDuration) - // Emulate the load balancer loop, sending requests to it with random hash keys - // over and over again till fadeIn period is over. - func() { - for { - ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) - ep := a.Apply(ctx) - stats = append(stats, ep.Host) - select { - case <-stop: - return - default: - } - } - }() - - // Split fade-in period into buckets and count how many times each endpoint was selected. - t.Log("test done", time.Now()) - t.Log("CSV timestamp," + strings.Join(eps, ",")) - bucketSize := len(stats) / bucketCount - var allBuckets []map[string]int - for i := 0; i < bucketCount; i++ { - bucketStats := make(map[string]int) - for j := i * bucketSize; j < (i+1)*bucketSize; j++ { - bucketStats[stats[j]]++ - } - - allBuckets = append(allBuckets, bucketStats) - } - - directions := make(map[string]int) - for _, epi := range eps { - first := allBuckets[0][epi] - last := allBuckets[len(allBuckets)-1][epi] - t := tolerance(first, last) - switch { - case last-first > t: - directions[epi] = 1 - case last-first < t: - directions[epi] = -1 - } - } - - for i := range allBuckets { - // trim first and last (warmup and settling) - if i < 2 || i == len(allBuckets)-1 { - continue - } - - for _, epi := range eps { - if !checkMonotony( - directions[epi], - allBuckets[i-1][epi], - allBuckets[i][epi], - ) { - t.Error("non-monotonic change", epi, i) - } - } - } - - for i, bucketStats := range allBuckets { - var showStats []string - for _, epi := range eps { - showStats = append(showStats, fmt.Sprintf("%d", bucketStats[epi])) - } - - // Print CSV-like output for, where row number represents time and - // column represents endpoint. You can visualize it using - // ./skptesting/run_fadein_test.sh from the skipper repo root. - t.Log("CSV " + fmt.Sprintf("%d,", i) + strings.Join(showStats, ",")) - } - }) -} - -func newConsistentHashForTest(endpoints []string) routing.LBAlgorithm { - // The default parameter 100 is too small to get even distribution - return newConsistentHashInternal(endpoints, 1000) -} - -func TestFadeIn(t *testing.T) { - old := 2 * fadeInDuration - testFadeIn(t, "round-robin, 0", newRoundRobin, old, old) - testFadeIn(t, "round-robin, 1", newRoundRobin, 0, old) - testFadeIn(t, "round-robin, 2", newRoundRobin, 0, 0) - testFadeIn(t, "round-robin, 3", newRoundRobin, old, 0) - testFadeIn(t, "round-robin, 4", newRoundRobin, old, old, old, 0) - testFadeIn(t, "round-robin, 5", newRoundRobin, old, old, old, 0, 0, 0) - testFadeIn(t, "round-robin, 6", newRoundRobin, old, 0, 0, 0) - testFadeIn(t, "round-robin, 7", newRoundRobin, old, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "round-robin, 8", newRoundRobin, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "round-robin, 9", newRoundRobin, fadeInDuration/2, fadeInDuration/3, fadeInDuration/4) - - testFadeIn(t, "random, 0", newRandom, old, old) - testFadeIn(t, "random, 1", newRandom, 0, old) - testFadeIn(t, "random, 2", newRandom, 0, 0) - testFadeIn(t, "random, 3", newRandom, old, 0) - testFadeIn(t, "random, 4", newRandom, old, old, old, 0) - testFadeIn(t, "random, 5", newRandom, old, old, old, 0, 0, 0) - testFadeIn(t, "random, 6", newRandom, old, 0, 0, 0) - testFadeIn(t, "random, 7", newRandom, old, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "random, 8", newRandom, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "random, 9", newRandom, fadeInDuration/2, fadeInDuration/3, fadeInDuration/4) - - testFadeIn(t, "consistent-hash, 0", newConsistentHashForTest, old, old) - testFadeIn(t, "consistent-hash, 1", newConsistentHashForTest, 0, old) - testFadeIn(t, "consistent-hash, 2", newConsistentHashForTest, 0, 0) - testFadeIn(t, "consistent-hash, 3", newConsistentHashForTest, old, 0) - testFadeIn(t, "consistent-hash, 4", newConsistentHashForTest, old, old, old, 0) - testFadeIn(t, "consistent-hash, 5", newConsistentHashForTest, old, old, old, 0, 0, 0) - testFadeIn(t, "consistent-hash, 6", newConsistentHashForTest, old, 0, 0, 0) - testFadeIn(t, "consistent-hash, 7", newConsistentHashForTest, old, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "consistent-hash, 8", newConsistentHashForTest, 0, 0, 0, 0, 0, 0) - testFadeIn(t, "consistent-hash, 9", newConsistentHashForTest, fadeInDuration/2, fadeInDuration/3, fadeInDuration/4) -} - -func testFadeInLoadBetweenOldEps( - t *testing.T, - name string, - algorithm func([]string) routing.LBAlgorithm, - nOld int, nNew int, -) { - t.Run(name, func(t *testing.T) { - const ( - numberOfReqs = 100000 - acceptableErrorNearZero = 10 - old = fadeInDurationHuge - new = time.Duration(0) - ) - endpointAges := []time.Duration{} - for i := 0; i < nOld; i++ { - endpointAges = append(endpointAges, old) - } - for i := 0; i < nNew; i++ { - endpointAges = append(endpointAges, new) - } - - ctx, eps := initializeEndpoints(endpointAges, fadeInDurationHuge) - - a := algorithm(eps) - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - nReqs := map[string]int{} - - t.Log("test start", time.Now()) - // Emulate the load balancer loop, sending requests to it with random hash keys - // over and over again till fadeIn period is over. - for i := 0; i < numberOfReqs; i++ { - ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) - ep := a.Apply(ctx) - nReqs[ep.Host]++ - } - - expectedReqsPerOldEndpoint := numberOfReqs / nOld - for idx, ep := range eps { - if endpointAges[idx] == old { - assert.InEpsilon(t, expectedReqsPerOldEndpoint, nReqs[ep], 0.2) - } - if endpointAges[idx] == new { - assert.InDelta(t, 0, nReqs[ep], acceptableErrorNearZero) - } - } - }) -} - -func TestFadeInLoadBetweenOldEps(t *testing.T) { - for nOld := 1; nOld < 6; nOld++ { - for nNew := 0; nNew < 6; nNew++ { - testFadeInLoadBetweenOldEps(t, fmt.Sprintf("consistent-hash, %d old, %d new", nOld, nNew), newConsistentHash, nOld, nNew) - testFadeInLoadBetweenOldEps(t, fmt.Sprintf("random, %d old, %d new", nOld, nNew), newRandom, nOld, nNew) - testFadeInLoadBetweenOldEps(t, fmt.Sprintf("round-robin, %d old, %d new", nOld, nNew), newRoundRobin, nOld, nNew) - } - } -} - -func testApplyEndsWhenAllEndpointsAreFading( - t *testing.T, - name string, - algorithm func([]string) routing.LBAlgorithm, - nEndpoints int, -) { - t.Run(name, func(t *testing.T) { - // Initialize every endpoint with zero: every endpoint is new - endpointAges := make([]time.Duration, nEndpoints) - - ctx, eps := initializeEndpoints(endpointAges, fadeInDurationHuge) - - a := algorithm(eps) - ctx.Params[ConsistentHashKey] = "someConstantString" - applied := make(chan struct{}) - - go func() { - a.Apply(ctx) - close(applied) - }() - - select { - case <-time.After(time.Second): - t.Errorf("a.Apply has not finished in a reasonable time") - case <-applied: - break - } - }) -} - -func TestApplyEndsWhenAllEndpointsAreFading(t *testing.T) { - for nEndpoints := 1; nEndpoints < 10; nEndpoints++ { - testApplyEndsWhenAllEndpointsAreFading(t, "consistent-hash", newConsistentHash, nEndpoints) - testApplyEndsWhenAllEndpointsAreFading(t, "random", newRandom, nEndpoints) - testApplyEndsWhenAllEndpointsAreFading(t, "round-robin", newRoundRobin, nEndpoints) - } -} - -func benchmarkFadeIn( - b *testing.B, - name string, - algorithm func([]string) routing.LBAlgorithm, - clients int, - endpointAges ...time.Duration, -) { - b.Run(name, func(b *testing.B) { - var detectionTimes []time.Time - now := time.Now() - for _, ea := range endpointAges { - detectionTimes = append(detectionTimes, now.Add(-ea)) - } - - var eps []string - for i := range endpointAges { - eps = append(eps, string('a'+rune(i))) - } - - a := algorithm(eps) - - route := &routing.Route{ - LBFadeInDuration: fadeInDuration, - LBFadeInExponent: 1, - } - registry := routing.NewEndpointRegistry(routing.RegistryOptions{}) - for i := range eps { - route.LBEndpoints = append(route.LBEndpoints, routing.LBEndpoint{ - Host: eps[i], - Detected: detectionTimes[i], - }) - registry.SetDetectedTime(eps[i], detectionTimes[i]) - } - - var wg sync.WaitGroup - - // Emulate the load balancer loop, sending requests to it with random hash keys - // over and over again till fadeIn period is over. - b.ResetTimer() - for i := 0; i < clients; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - ctx := &routing.LBContext{ - Params: map[string]interface{}{}, - Route: route, - Registry: registry, - } - - for j := 0; j < b.N/clients; j++ { - ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) - _ = a.Apply(ctx) - } - }(i) - } - - wg.Wait() - }) -} - -func repeatedSlice(v time.Duration, n int) []time.Duration { - var s []time.Duration - for i := 0; i < n; i++ { - s = append(s, v) - } - return s -} - -func BenchmarkFadeIn(b *testing.B) { - old := 2 * fadeInDuration - clients := []int{1, 4, 16, 64, 256} - for _, c := range clients { - benchmarkFadeIn(b, fmt.Sprintf("random, 11, %d clients", c), newRandom, c, repeatedSlice(old, 200)...) - } - - for _, c := range clients { - benchmarkFadeIn(b, fmt.Sprintf("round-robin, 11, %d clients", c), newRoundRobin, c, repeatedSlice(old, 200)...) - } - - for _, c := range clients { - benchmarkFadeIn(b, fmt.Sprintf("consistent-hash, 11, %d clients", c), newConsistentHash, c, repeatedSlice(old, 200)...) - } -} diff --git a/proxy/proxy.go b/proxy/proxy.go index 198d67c859..353eb7df70 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "math" "net" "net/http" "net/http/httptrace" @@ -468,7 +469,42 @@ func setRequestURLForDynamicBackend(u *url.URL, stateBag map[string]interface{}) } } +func fadeIn(now time.Time, duration time.Duration, exponent float64, detected time.Time) float64 { + rel := now.Sub(detected) + fadingIn := rel > 0 && rel < duration + if !fadingIn { + return 1 + } + + return math.Pow(float64(rel)/float64(duration), exponent) +} + +func filterFadeIn(rt *routing.Route, lbctx *routing.LBContext) []routing.LBEndpoint { + now := time.Now() + + filtered := []routing.LBEndpoint{} + for _, e := range lbctx.Endpoints { + detected := lbctx.Registry.GetMetrics(e.Host).DetectedTime() + f := fadeIn( + now, + rt.LBFadeInDuration, + rt.LBFadeInExponent, + detected, + ) + if rt.Rnd.Float64() < f { + filtered = append(filtered, e) + } + } + return filtered +} + func setRequestURLForLoadBalancedBackend(u *url.URL, rt *routing.Route, lbctx *routing.LBContext) *routing.LBEndpoint { + lbctx.Endpoints = rt.LBEndpoints + lbctx.Endpoints = filterFadeIn(rt, lbctx) + if len(lbctx.Endpoints) == 0 { + lbctx.Endpoints = rt.LBEndpoints + } + e := rt.LBAlgorithm.Apply(lbctx) u.Scheme = e.Scheme u.Host = e.Host @@ -490,7 +526,7 @@ func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHea setRequestURLFromRequest(u, r) setRequestURLForDynamicBackend(u, stateBag) case eskip.LBBackend: - endpoint = setRequestURLForLoadBalancedBackend(u, rt, &routing.LBContext{Request: r, Route: rt, Params: stateBag, Registry: registry}) + endpoint = setRequestURLForLoadBalancedBackend(u, rt, &routing.LBContext{Request: r, Params: stateBag, Registry: registry}) default: u.Scheme = rt.Scheme u.Host = rt.Host diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 63d11a0638..84333412b9 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -2268,6 +2268,286 @@ func TestProxyFromEmptyContext(t *testing.T) { assert.Nil(t, proxyUrl) } +const ( + fadeInDuration = 500 * time.Millisecond + fadeInDurationHuge = 24 * time.Hour // we need this to be sure we're at the very beginning of fading in + bucketCount = 20 + monotonyTolerance = 0.4 // we need to use a high tolerance for CI testing +) + +func absint(i int) int { + if i < 0 { + return -i + } + + return i +} + +func tolerance(prev, next int) int { + return int(float64(prev+next) * monotonyTolerance / 2) +} + +func checkMonotony(direction, prev, next int) bool { + t := tolerance(prev, next) + switch direction { + case 1: + return next-prev >= -t + case -1: + return next-prev <= t + default: + return absint(next-prev) < t + } +} + +func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Duration) (*routing.LBContext, *routing.Route, []string) { + var detectionTimes []time.Time + now := time.Now() + for _, ea := range endpointAges { + detectionTimes = append(detectionTimes, now.Add(-ea)) + } + + var eps []string + for i := range endpointAges { + eps = append(eps, fmt.Sprintf("http://ep-%d-%s.test", i, endpointAges[i])) + } + + registry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + eskipRoute := eskip.Route{BackendType: eskip.LBBackend} + for i := range eps { + eskipRoute.LBEndpoints = append(eskipRoute.LBEndpoints, eps[i]) + registry.SetDetectedTime(eps[i], detectionTimes[i]) + } + + route := &routing.Route{ + Route: eskipRoute, + LBFadeInDuration: fadeInDuration, + LBFadeInExponent: 1, + LBEndpoints: []routing.LBEndpoint{}, + Rnd: rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec + } + + rt := loadbalancer.NewAlgorithmProvider().Do([]*routing.Route{route}) + route = rt[0] + ctx := &routing.LBContext{ + Params: map[string]interface{}{}, + Registry: registry, + } + + eps = []string{} + for i := range endpointAges { + eps = append(eps, fmt.Sprintf("ep-%d-%s.test:80", i, endpointAges[i])) + registry.SetDetectedTime(eps[i], detectionTimes[i]) + } + return ctx, route, eps +} + +func testFadeIn( + t *testing.T, + name string, + endpointAges ...time.Duration, +) { + t.Run(name, func(t *testing.T) { + ctx, route, eps := initializeEndpoints(endpointAges, fadeInDuration) + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + t.Log("test start", time.Now()) + var stats []string + stop := time.After(fadeInDuration) + // Emulate the load balancer loop, sending requests to it with random hash keys + // over and over again till fadeIn period is over. + func() { + for { + ctx.Params[loadbalancer.ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) + ep := setRequestURLForLoadBalancedBackend(&url.URL{}, route, ctx) + stats = append(stats, ep.Host) + select { + case <-stop: + return + default: + } + } + }() + + // Split fade-in period into buckets and count how many times each endpoint was selected. + t.Log("test done", time.Now()) + t.Log("CSV timestamp," + strings.Join(eps, ",")) + bucketSize := len(stats) / bucketCount + var allBuckets []map[string]int + for i := 0; i < bucketCount; i++ { + bucketStats := make(map[string]int) + for j := i * bucketSize; j < (i+1)*bucketSize; j++ { + bucketStats[stats[j]]++ + } + + allBuckets = append(allBuckets, bucketStats) + } + + directions := make(map[string]int) + for _, epi := range eps { + first := allBuckets[0][epi] + last := allBuckets[len(allBuckets)-1][epi] + t := tolerance(first, last) + switch { + case last-first > t: + directions[epi] = 1 + case last-first < t: + directions[epi] = -1 + } + } + + for i := range allBuckets { + // trim first and last (warmup and settling) + if i < 2 || i == len(allBuckets)-1 { + continue + } + + for _, epi := range eps { + if !checkMonotony( + directions[epi], + allBuckets[i-1][epi], + allBuckets[i][epi], + ) { + t.Error("non-monotonic change", epi, i) + } + } + } + + for i, bucketStats := range allBuckets { + var showStats []string + for _, epi := range eps { + showStats = append(showStats, fmt.Sprintf("%d", bucketStats[epi])) + } + + // Print CSV-like output for, where row number represents time and + // column represents endpoint. You can visualize it using + // ./skptesting/run_fadein_test.sh from the skipper repo root. + t.Log("CSV " + fmt.Sprintf("%d,", i) + strings.Join(showStats, ",")) + } + }) +} + +func TestFadeIn(t *testing.T) { + old := 2 * fadeInDuration + testFadeIn(t, "round-robin, 0", old, old) + testFadeIn(t, "round-robin, 1", 0, old) + testFadeIn(t, "round-robin, 2", 0, 0) + testFadeIn(t, "round-robin, 3", old, 0) + testFadeIn(t, "round-robin, 4", old, old, old, 0) + testFadeIn(t, "round-robin, 5", old, old, old, 0, 0, 0) + testFadeIn(t, "round-robin, 6", old, 0, 0, 0) + testFadeIn(t, "round-robin, 7", old, 0, 0, 0, 0, 0, 0) + testFadeIn(t, "round-robin, 8", 0, 0, 0, 0, 0, 0) + testFadeIn(t, "round-robin, 9", fadeInDuration/2, fadeInDuration/3, fadeInDuration/4) +} + +func testApplyEndsWhenAllEndpointsAreFading( + t *testing.T, + name string, + nEndpoints int, +) { + t.Run(name, func(t *testing.T) { + // Initialize every endpoint with zero: every endpoint is new + endpointAges := make([]time.Duration, nEndpoints) + + ctx, route, _ := initializeEndpoints(endpointAges, fadeInDurationHuge) + + ctx.Params[loadbalancer.ConsistentHashKey] = "someConstantString" + applied := make(chan struct{}) + + go func() { + setRequestURLForLoadBalancedBackend(&url.URL{}, route, ctx) + close(applied) + }() + + select { + case <-time.After(time.Second): + t.Errorf("a.Apply has not finished in a reasonable time") + case <-applied: + break + } + }) +} + +func TestApplyEndsWhenAllEndpointsAreFading(t *testing.T) { + for nEndpoints := 1; nEndpoints < 10; nEndpoints++ { + testApplyEndsWhenAllEndpointsAreFading(t, "round-robin", nEndpoints) + } +} + +func benchmarkFadeIn( + b *testing.B, + name string, + clients int, + endpointAges ...time.Duration, +) { + b.Run(name, func(b *testing.B) { + var detectionTimes []time.Time + now := time.Now() + for _, ea := range endpointAges { + detectionTimes = append(detectionTimes, now.Add(-ea)) + } + + var eps []string + for i := range endpointAges { + eps = append(eps, string('a'+rune(i))) + } + + route := &routing.Route{ + LBFadeInDuration: fadeInDuration, + LBFadeInExponent: 1, + } + registry := routing.NewEndpointRegistry(routing.RegistryOptions{}) + for i := range eps { + route.LBEndpoints = append(route.LBEndpoints, routing.LBEndpoint{ + Host: eps[i], + Detected: detectionTimes[i], + }) + registry.SetDetectedTime(eps[i], detectionTimes[i]) + } + + var wg sync.WaitGroup + + // Emulate the load balancer loop, sending requests to it with random hash keys + // over and over again till fadeIn period is over. + b.ResetTimer() + for i := 0; i < clients; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + ctx := &routing.LBContext{ + Params: map[string]interface{}{}, + Registry: registry, + } + + for j := 0; j < b.N/clients; j++ { + ctx.Params[loadbalancer.ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000)) + _ = setRequestURLForLoadBalancedBackend(&url.URL{}, route, ctx) + } + }(i) + } + + wg.Wait() + }) +} + +func repeatedSlice(v time.Duration, n int) []time.Duration { + var s []time.Duration + for i := 0; i < n; i++ { + s = append(s, v) + } + return s +} + +func BenchmarkFadeIn(b *testing.B) { + old := 2 * fadeInDuration + clients := []int{1, 4, 16, 64, 256} + for _, c := range clients { + benchmarkFadeIn(b, fmt.Sprintf("round-robin, 11, %d clients", c), c, repeatedSlice(old, 200)...) + } +} + func BenchmarkAccessLogNoFilter(b *testing.B) { benchmarkAccessLog(b, "", 200) } func BenchmarkAccessLogDisablePrint(b *testing.B) { benchmarkAccessLog(b, "disableAccessLog(1,3)", 200) diff --git a/routing/datasource.go b/routing/datasource.go index 6879b96650..c89a545a0a 100644 --- a/routing/datasource.go +++ b/routing/datasource.go @@ -3,6 +3,7 @@ package routing import ( "errors" "fmt" + "math/rand" "net/url" "sort" "sync" @@ -486,7 +487,7 @@ func processRouteDef(cpm map[string]PredicateSpec, fr filters.Registry, def *esk return nil, err } - r := &Route{Route: *def, Scheme: scheme, Host: host, Predicates: cps, Filters: fs, weight: weight} + r := &Route{Route: *def, Scheme: scheme, Host: host, Predicates: cps, Filters: fs, weight: weight, Rnd: rand.New(NewLockedSource())} // #nosec if err := processTreePredicates(r, def.Predicates); err != nil { return nil, err } diff --git a/loadbalancer/locked_source.go b/routing/locked_source.go similarity index 85% rename from loadbalancer/locked_source.go rename to routing/locked_source.go index a662a35c4d..7b17b39a6c 100644 --- a/loadbalancer/locked_source.go +++ b/routing/locked_source.go @@ -1,4 +1,4 @@ -package loadbalancer +package routing import ( "math/rand" @@ -11,7 +11,7 @@ type lockedSource struct { r rand.Source } -func newLockedSource() *lockedSource { +func NewLockedSource() *lockedSource { return &lockedSource{r: rand.NewSource(time.Now().UnixNano())} } diff --git a/loadbalancer/locked_source_test.go b/routing/locked_source_test.go similarity index 87% rename from loadbalancer/locked_source_test.go rename to routing/locked_source_test.go index c3cfa68df9..2d8501037b 100644 --- a/loadbalancer/locked_source_test.go +++ b/routing/locked_source_test.go @@ -1,4 +1,4 @@ -package loadbalancer +package routing import ( "sync" @@ -12,7 +12,7 @@ func loadTestLockedSource(s *lockedSource, n int) { } func TestLockedSourceForConcurrentUse(t *testing.T) { - s := newLockedSource() + s := NewLockedSource() var wg sync.WaitGroup for i := 0; i < 10; i++ { diff --git a/routing/routing.go b/routing/routing.go index f47c498322..9347a25feb 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -3,6 +3,7 @@ package routing import ( "encoding/json" "fmt" + "math/rand" "net/http" "strconv" "strings" @@ -190,10 +191,10 @@ type LBAlgorithm interface { // LBContext is used to pass data to the load balancer to decide based // on that data which endpoint to call from the backends type LBContext struct { - Request *http.Request - Route *Route - Params map[string]interface{} - Registry *EndpointRegistry + Request *http.Request + Params map[string]interface{} + Registry *EndpointRegistry + Endpoints []LBEndpoint } // NewLBContext is used to create a new LBContext, to pass data to the @@ -202,7 +203,6 @@ type LBContext struct { func NewLBContext(r *http.Request, rt *Route) *LBContext { return &LBContext{ Request: r, - Route: rt, } } @@ -221,6 +221,9 @@ type Route struct { // path predicate matching a subtree pathSubtree string + // random number generator used for LB algorithm + Rnd *rand.Rand + // The backend scheme and host. Scheme, Host string diff --git a/skptesting/run_fadein_test.sh b/skptesting/run_fadein_test.sh index 77b7c521af..c27da51e63 100755 --- a/skptesting/run_fadein_test.sh +++ b/skptesting/run_fadein_test.sh @@ -1,7 +1,7 @@ #!/bin/bash function run_test() { - GO111MODULE=on go test ./loadbalancer -run="$1" -count=1 -v | awk '/fadein_test.go:[0-9]+: CSV/ {print $3}' + GO111MODULE=on go test ./proxy -run="$1" -count=1 -v | awk '/proxy_test.go:[0-9]+: CSV/ {print $3}' } cwd=$( dirname "${BASH_SOURCE[0]}" )