Skip to content

Commit

Permalink
Improves consistent hash endpoint selection strategy (#1715)
Browse files Browse the repository at this point in the history
See #1712

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov authored Feb 12, 2021
1 parent cf468bb commit ff61a38
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 16 deletions.
51 changes: 36 additions & 15 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"math/rand"
"net/url"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -190,31 +191,51 @@ func (r *random) Apply(ctx *routing.LBContext) routing.LBEndpoint {
return withFadeIn(r.rand, ctx, r.notFadingIndexes, r.fadingWeights, i)
}

type consistentHash struct{}
type (
endpointHash struct {
index int // index of endpoint in endpoint list
hash uint32 // hash of endpoint
}
consistentHash []endpointHash // list of endpoints sorted by hash value
)

func (ch consistentHash) Len() int { return len(ch) }
func (ch consistentHash) Less(i, j int) bool { return ch[i].hash < ch[j].hash }
func (ch consistentHash) Swap(i, j int) { ch[i], ch[j] = ch[j], ch[i] }

func newConsistentHash(endpoints []string) routing.LBAlgorithm {
return &consistentHash{}
ch := consistentHash(make([]endpointHash, len(endpoints)))
for i, ep := range endpoints {
ch[i] = endpointHash{i, hash(ep)}
}
sort.Sort(ch)
return ch
}

func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}

// Returns index of endpoint with closest hash to key's hash
func (ch consistentHash) search(key string) int {
h := hash(key)
i := sort.Search(ch.Len(), func(i int) bool { return ch[i].hash >= h })
if i == ch.Len() { // rollover
i = 0
}
return ch[i].index
}

// Apply implements routing.LBAlgorithm with a consistent hash algorithm.
func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
func (ch consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
if len(ctx.Route.LBEndpoints) == 1 {
return ctx.Route.LBEndpoints[0]
}

var sum uint32
h := fnv.New32()

key := net.RemoteHost(ctx.Request).String()
if _, err := h.Write([]byte(key)); err != nil {
log.Errorf("Failed to write '%s' into hash: %v", key, err)
return ctx.Route.LBEndpoints[rand.Intn(len(ctx.Route.LBEndpoints))] // #nosec
}
sum = h.Sum32()
choice := int(sum) % len(ctx.Route.LBEndpoints)
if choice < 0 {
choice = len(ctx.Route.LBEndpoints) + choice
}
choice := ch.search(key)

return ctx.Route.LBEndpoints[choice]
}
Expand Down
30 changes: 29 additions & 1 deletion loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestSelectAlgorithm(t *testing.T) {
t.Fatal("failed to set the endpoints")
}

if _, ok := rr[0].LBAlgorithm.(*consistentHash); !ok {
if _, ok := rr[0].LBAlgorithm.(consistentHash); !ok {
t.Fatal("failed to set the right algorithm")
}
})
Expand Down Expand Up @@ -282,3 +282,31 @@ func TestApply(t *testing.T) {
})
}
}

func TestConsistentHash(t *testing.T) {
apply := func(key string, endpoints []string) string {
ch := newConsistentHash(endpoints).(consistentHash)
return endpoints[ch.search(key)]
}

endpoints := []string{"http://127.0.0.1:8080", "http://127.0.0.2:8080", "http://127.0.0.3:8080"}
const key = "192.168.0.1"

ep := apply(key, endpoints)

// remove endpoint
endpoints = endpoints[1:]

ep1 := apply(key, endpoints)
if ep != ep1 {
t.Errorf("expected to select %s, got %s", ep, ep1)
}

// add endpoint
endpoints = append(endpoints, "http://127.0.0.4:8080")

ep2 := apply(key, endpoints)
if ep != ep2 {
t.Errorf("expected to select %s, got %s", ep, ep2)
}
}

0 comments on commit ff61a38

Please sign in to comment.