Skip to content

Commit

Permalink
phc: test multiple unhealthy endpoints (#3058)
Browse files Browse the repository at this point in the history
use vegeta with all algorithms except `consistentHash` as it's a bit tricky to set different headers for attacks

Signed-off-by: Mustafa Abdelrahman <[email protected]>
  • Loading branch information
MustafaSaber authored May 7, 2024
1 parent 75c404b commit d563b7c
Showing 1 changed file with 151 additions and 50 deletions.
201 changes: 151 additions & 50 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package proxy

import (
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -10,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zalando/skipper/metrics/metricstest"
zhttptest "github.com/zalando/skipper/net/httptest"
"github.com/zalando/skipper/routing"
)

Expand All @@ -22,8 +25,8 @@ func defaultEndpointRegistry() *routing.EndpointRegistry {
return routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: period,
MinRequests: 10,
MaxHealthCheckDropProbability: 1.0,
MinRequests: 2,
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
}
Expand All @@ -49,8 +52,18 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) {
return
}

func fireVegeta(t *testing.T, ps *httptest.Server, freq int, per time.Duration, timeout time.Duration) *zhttptest.VegetaAttacker {
t.Helper()
va := zhttptest.NewVegetaAttacker(ps.URL, freq, per, timeout)
va.Attack(io.Discard, 5*time.Second, t.Name())
return va
}

func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.Server) {
endpointRegistry := defaultEndpointRegistry()
return setupProxyWithCustomEndpointRegisty(t, doc, defaultEndpointRegistry())
}

func setupProxyWithCustomEndpointRegisty(t *testing.T, doc string, endpointRegistry *routing.EndpointRegistry) (*metricstest.MockMetrics, *httptest.Server) {
m := &metricstest.MockMetrics{}

tp, err := newTestProxyWithParams(doc, Params{
Expand All @@ -68,20 +81,33 @@ func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.S
return m, ps
}

func TestPHCWithoutRequests(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
func setupServices(t *testing.T, healthy, unhealthy int) []string {
services := []string{}
for i := 0; i < healthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
services = append(services, service.URL)
t.Cleanup(service.Close)
}
for i := 0; i < unhealthy; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}))
services = append(services, service.URL)
t.Cleanup(service.Close)
}
return services
}

func TestPHCWithoutRequests(t *testing.T) {
services := setupServices(t, 3, 0)

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
algorithm, services[0], services[1], services[2]))
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -93,7 +119,7 @@ func TestPHCWithoutRequests(t *testing.T) {

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
services[0], services[1], services[2]))
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -104,13 +130,10 @@ func TestPHCWithoutRequests(t *testing.T) {
}

func TestPHCForSingleHealthyEndpoint(t *testing.T) {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer service.Close()
service := setupServices(t, 1, 0)[0]
endpointRegistry := defaultEndpointRegistry()

doc := fmt.Sprintf(`* -> "%s"`, service.URL)
doc := fmt.Sprintf(`* -> "%s"`, service)
tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
Expand All @@ -128,63 +151,141 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) {
}

func TestPHCForMultipleHealthyEndpoints(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
}
services := setupServices(t, 3, 0)

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
algorithm, services[0], services[1], services[2]))
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)
count200, ok := va.CountStatus(200)
assert.True(t, ok)
assert.InDelta(t, count200, 15000, 50) // 3000*5s, the delta is for CI
assert.Equal(t, count200, int(va.TotalRequests()))
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
services[0], services[1], services[2]))
va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)
count200, ok := va.CountStatus(200)
assert.True(t, ok)
assert.Equal(t, count200, int(va.TotalRequests()))
})
}

func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) {
services := []*httptest.Server{}
for i := 0; i < 3; i++ {
serviceNum := i
service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if serviceNum == 0 {
// emulating unhealthy endpoint
time.Sleep(100 * time.Millisecond)
}
w.WriteHeader(http.StatusOK)
}))
services = append(services, service)
defer service.Close()
}

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
services := setupServices(t, 2, 1)
for _, algorithm := range []string{"random", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
algorithm, services[0], services[1], services[2]))

va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)

count200, ok := va.CountStatus(200)
assert.True(t, ok)

count504, ok := va.CountStatus(504)
assert.True(t, ok)

failedRequests := math.Abs(float64(va.TotalRequests())) - float64(count200)
t.Logf("total requests: %d, count200: %d, count504: %d, failedRequests: %f", va.TotalRequests(), count200, count504, failedRequests)

assert.InDelta(t, float64(count504), failedRequests, 5)
assert.InDelta(t, 0, float64(failedRequests), 0.1*float64(va.TotalRequests()))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
assert.InDelta(t, float64(va.TotalRequests()), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(va.TotalRequests())) // allow 30% error
})
})
}

t.Run("consistentHash", func(t *testing.T) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <consistentHash, "%s", "%s", "%s">`,
services[0], services[1], services[2]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
})
})

t.Run("consistent hash with balance factor", func(t *testing.T) {
_, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0], services[1], services[2]))
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
})
})
}

func TestPHCForMultipleHealthyAndMultipleUnhealthyEndpoints(t *testing.T) {
services := setupServices(t, 2, 2)
for _, algorithm := range []string{"random", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s", "%s">`,
algorithm, services[0], services[1], services[2], services[3]))

va := fireVegeta(t, ps, 3000, 1*time.Second, 5*time.Second)

count200, ok := va.CountStatus(200)
assert.True(t, ok)

count504, ok := va.CountStatus(504)
assert.True(t, ok)

failedRequests := math.Abs(float64(va.TotalRequests())) - float64(count200)
t.Logf("total requests: %d, count200: %d, count504: %d, failedRequests: %f", va.TotalRequests(), count200, count504, failedRequests)

assert.InDelta(t, float64(count504), failedRequests, 5)
assert.InDelta(t, 0, float64(failedRequests), 0.3*float64(va.TotalRequests()))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, 2*float64(va.TotalRequests()), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(va.TotalRequests()))
})
})
}

t.Run("consistentHash", func(t *testing.T) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions and -race
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <consistentHash, "%s", "%s", "%s", "%s">`,
services[0], services[1], services[2], services[3]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.2*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, 2*float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(nRequests))
})
})

t.Run("consistent hash with balance factor", func(t *testing.T) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{
PassiveHealthCheckEnabled: true,
StatsResetPeriod: 1 * time.Second,
MinRequests: 1, // with 2 test case fails on github actions and -race
MaxHealthCheckDropProbability: 0.95,
MinHealthCheckDropProbability: 0.01,
})
mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s", "%s">`,
services[0], services[1], services[2], services[3]), endpointRegistry)
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.2*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, 2*float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.6*float64(nRequests))
})
})
}

0 comments on commit d563b7c

Please sign in to comment.