Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix (*consistentHash).Apply #2983

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,11 @@ func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
return ctx.LBEndpoints[0]
}

// The index returned from this call is taken from hash ring which is built from data about
// all endpoints, including fading in, unhealthy, etc. ones. The index stored in hash ring is
// the index of the endpoint in the original list of endpoints.
choice := ch.chooseConsistentHashEndpoint(ctx)
return ctx.LBEndpoints[choice]
return ctx.Route.LBEndpoints[choice]
}

func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext) int {
Expand Down
213 changes: 109 additions & 104 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package proxy

import (
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/zalando/skipper/loadbalancer"
"github.com/zalando/skipper/routing"
)

Expand Down Expand Up @@ -37,30 +35,36 @@ func TestPHCWithoutRequests(t *testing.T) {
services = append(services, service)
defer service.Close()
}
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> <random, "%s", "%s", "%s">`, services[0].URL, services[1].URL, services[2].URL)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()
doc := fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := ps.Client().Get(ps.URL)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()

time.Sleep(10 * period)
/* this test is needed to check PHC will not crash without requests sent during period at all */
})
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

rsp, err := ps.Client().Get(ps.URL)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()

time.Sleep(10 * period)
/* this test is needed to check PHC will not crash without requests sent during period at all */
}

func TestPHCForSingleHealthyEndpoint(t *testing.T) {
Expand Down Expand Up @@ -107,99 +111,100 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) {
services = append(services, service)
defer service.Close()
}
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> <random, "%s", "%s", "%s">`, services[0].URL, services[1].URL, services[2].URL)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

failedReqs := 0
for i := 0; i < nRequests; i++ {
rsp, err := ps.Client().Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
failedReqs++
}
rsp.Body.Close()
}
assert.Equal(t, 0, failedReqs)
}

type roundTripperUnhealthyHost struct {
inner http.RoundTripper
host string
probability float64
rnd *rand.Rand
}

type RoundTripperUnhealthyHostOptions struct {
Host string
Probability float64
}

func (rt *roundTripperUnhealthyHost) RoundTrip(r *http.Request) (*http.Response, error) {
p := rt.rnd.Float64()
if p < rt.probability && r.URL.Host == rt.host {
return nil, fmt.Errorf("roundTrip fail injected")
}

return rt.inner.RoundTrip(r)
}

func newRoundTripperUnhealthyHost(o *RoundTripperUnhealthyHostOptions) func(r http.RoundTripper) http.RoundTripper {
return func(r http.RoundTripper) http.RoundTripper {
return &roundTripperUnhealthyHost{inner: r, rnd: rand.New(loadbalancer.NewLockedSource()), host: o.Host, probability: o.Probability}
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()
doc := fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

failedReqs := 0
for i := 0; i < nRequests; i++ {
req, err := http.NewRequest("GET", ps.URL, nil)
if err != nil {
t.Fatal(err)
}
req.Header.Add("ConsistentHashKey", fmt.Sprintf("%d", i))

rsp, err := ps.Client().Do(req)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
failedReqs++
}
rsp.Body.Close()
}
assert.Equal(t, 0, failedReqs)
})
}
}

func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
serviceNum := i
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if serviceNum == 0 {
// emulating unhealthy endpoint
time.Sleep(100 * time.Millisecond)
}
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
}
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> <random, "%s", "%s", "%s">`, services[0].URL, services[1].URL, services[2].URL)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
CustomHttpRoundTripperWrap: newRoundTripperUnhealthyHost(&RoundTripperUnhealthyHostOptions{Host: services[0].URL[7:], Probability: rtFailureProbability}),
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

failedReqs := 0
for i := 0; i < nRequests; i++ {
rsp, err := ps.Client().Get(ps.URL)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
failedReqs++
}
rsp.Body.Close()
for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
endpointRegistry := defaultEndpointRegistry()
doc := fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL)

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
})
if err != nil {
t.Fatal(err)
}
defer tp.close()

ps := httptest.NewServer(tp.proxy)
defer ps.Close()

failedReqs := 0
for i := 0; i < nRequests; i++ {
req, err := http.NewRequest("GET", ps.URL, nil)
if err != nil {
t.Fatal(err)
}
req.Header.Add("ConsistentHashKey", fmt.Sprintf("%d", i))

rsp, err := ps.Client().Do(req)
if err != nil {
t.Fatal(err)
}

if rsp.StatusCode != http.StatusOK {
failedReqs++
}
rsp.Body.Close()
}
RomanZavodskikh marked this conversation as resolved.
Show resolved Hide resolved
assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*float64(nRequests), failedReqs, 0.1*float64(nRequests))
})
}
assert.InDelta(t, 0.33*rtFailureProbability*(1.0-rtFailureProbability)*float64(nRequests), failedReqs, 0.1*float64(nRequests))
}
Loading