Skip to content

Commit

Permalink
wip: passive health checks
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
Roman Zavodskikh committed Feb 1, 2024
1 parent ef11aa0 commit 56b9fee
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 9 deletions.
16 changes: 16 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ type Config struct {
OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"`
OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"`
OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"`

// Passive Health Checks
EnablePassiveHealthCheck bool `yaml:"enable-passive-health-check"`
PassiveHealthCheckParams mapFlags `yaml:"passive-health-check-params"`
}

const (
Expand Down Expand Up @@ -567,6 +571,14 @@ func NewConfig() *Config {
flag.Var(cfg.LuaModules, "lua-modules", "comma separated list of lua filter modules. Use <module>.<symbol> to selectively enable module symbols, for example: package,base._G,base.print,json")
flag.Var(cfg.LuaSources, "lua-sources", `comma separated list of lua input types for the lua() filter. Valid sources "", "file", "inline", "file,inline" and "none". Use "file" to only allow lua file references in lua filter. Default "" is the same as "file","inline". Use "none" to disable lua filters.`)

// Passive Health Checks
flag.BoolVar(&cfg.EnablePassiveHealthCheck, "enable-passive-health-check", false, "enables passive health check")
cfg.PassiveHealthCheckParams = mapFlags{values: map[string]string{
"total-requests-threshold": "10",
"timed-out-requests-ratio-threshold": "0.1",
}}
flag.Var(&cfg.PassiveHealthCheckParams, "passive-health-check-params", "sets the parameters for passive health check feature")

cfg.flags = flag
return cfg
}
Expand Down Expand Up @@ -906,6 +918,10 @@ func (c *Config) ToOptions() skipper.Options {
OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata,
OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval,
OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout,

// Passive Health Checks
EnablePassiveHealthCheck: c.EnablePassiveHealthCheck,
PassiveHealthCheckParams: c.PassiveHealthCheckParams.values,
}
for _, rcci := range c.CloneRoute {
eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl)
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func defaultConfig() *Config {
LuaSources: commaListFlag(),
OpenPolicyAgentCleanerInterval: 10 * time.Second,
OpenPolicyAgentStartupTimeout: 30 * time.Second,
PassiveHealthCheckParams: mapFlags{values: map[string]string{
"total-requests-threshold": "10",
"timed-out-requests-ratio-threshold": "0.1",
}},
}
}

Expand Down
13 changes: 13 additions & 0 deletions docs/operation/operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,19 @@ to get the results paginated or getting all of them at the same time.
curl localhost:9911/routes?offset=200&limit=100
```

## Passive health check

Skipper has an option to automatically detect and mitigate faulty backend enpoints, this feature is called
Passive Health Check(PHC). To enable this feature, you need to provide `-enable-passive-health-check` command line flag to Skipper.
Given only this flag, Skipper will try to provide PHC functionality with default parameters.
You can configure those parameters with `-passive-health-check-params` command line flag,
like this: `-passive-health-check-params=total-requests-threshold=10,timed-out-requests-ratio-threshold=0.1`.

The possible sub-options for `-passive-health-check-params` are:
+ `total-requests-threshold=<int>` - the minimum number of requests per minute per backend pod required to activate PHC for this pod
+ `timed-out-requests-ratio-threshold=<float>` - the minimum ratio of failed requests for every backend pod to make PHC avoid
sending requests to this pod

## Memory consumption

While Skipper is generally not memory bound, some features may require
Expand Down
4 changes: 4 additions & 0 deletions metricsinit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func TestInitOrderAndDefault(t *testing.T) {
SwarmRedisURLs: []string{fmt.Sprintf("localhost:%d", redisPort)},
EnableRatelimiters: true,
SwarmRedisConnMetricsInterval: ringMetricsUpdatePeriod,
PassiveHealthCheckParams: map[string]string{
"total-requests-threshold": "10",
"timed-out-requests-ratio-threshold": "0.1",
},
}

tornDown := make(chan struct{})
Expand Down
6 changes: 5 additions & 1 deletion proxy/fadein_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func initializeEndpoints(endpointAges []float64, algorithmName string, fadeInDur
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

proxy := &Proxy{registry: registry, fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry}}
proxy := &Proxy{
registry: registry,
fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: registry},
heathlyEndpoints: &healthyEndpoints{enabled: false},
}
return route, proxy, eps
}

Expand Down
35 changes: 35 additions & 0 deletions proxy/healthy_endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package proxy

import "github.com/zalando/skipper/routing"

type healthyEndpoints struct {
enabled bool
timeoutedRequestsRatioThreshold float64
totalRequestsThreshold float64
}

func (h *healthyEndpoints) healthyEndpoint(endpoint routing.LBEndpoint) bool {
timeoutedRequests := float64(endpoint.Metrics.TimeoutedRequests())
totalRequests := float64(endpoint.Metrics.TotalRequests())
failedRequestsRatio := timeoutedRequests / totalRequests

return failedRequestsRatio < h.timeoutedRequestsRatioThreshold || totalRequests < h.totalRequestsThreshold
}

func (h *healthyEndpoints) filterHealthyEndpoints(endpoints []routing.LBEndpoint, rt *routing.Route) []routing.LBEndpoint {
if !h.enabled {
return endpoints
}

filtered := make([]routing.LBEndpoint, 0, len(endpoints))
for _, e := range endpoints {
if h.healthyEndpoint(e) {
filtered = append(filtered, e)
}
}

if len(filtered) == 0 {
return endpoints
}
return filtered
}
104 changes: 104 additions & 0 deletions proxy/healthy_endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package proxy

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/loadbalancer"
"github.com/zalando/skipper/routing"
)

func initiliaze(hosts []string) (*routing.EndpointRegistry, *routing.Route) {
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
eskipRoute := eskip.Route{BackendType: eskip.LBBackend, LBAlgorithm: loadbalancer.PowerOfRandomNChoices.String()}
for i := range hosts {
eskipRoute.LBEndpoints = append(eskipRoute.LBEndpoints, fmt.Sprintf("http://%s", hosts[i]))
}

route := &routing.Route{
Route: eskipRoute,
LBEndpoints: []routing.LBEndpoint{},
}
rt := loadbalancer.NewAlgorithmProvider().Do([]*routing.Route{route})
route = rt[0]
endpointRegistry.Do([]*routing.Route{route})

return endpointRegistry, route
}

func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreHealthy(t *testing.T) {
hosts := []string{"host1:80", "host2:80", "host3:80"}
endpointRegistry, route := initiliaze(hosts)

for i := range hosts {
metrics := endpointRegistry.GetMetrics(hosts[i])
for j := 0; j < 50; j++ {
metrics.IncTotalRequests()
}
}

proxy := &Proxy{
registry: endpointRegistry,
heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10},
}
endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route)

assert.Equal(t, 3, len(endpoints))
assert.Equal(t, hosts[0], endpoints[0].Host)
assert.Equal(t, hosts[1], endpoints[1].Host)
assert.Equal(t, hosts[2], endpoints[2].Host)
}

func TestFilterHealthyEndpointsReturnsAllEndpointsWhenAllAreUnhealthy(t *testing.T) {
hosts := []string{"host1:80", "host2:80", "host3:80"}
endpointRegistry, route := initiliaze(hosts)

for i := range hosts {
metrics := endpointRegistry.GetMetrics(hosts[i])
for j := 0; j < 50; j++ {
metrics.IncTotalRequests()
}
for j := 0; j < 30; j++ {
metrics.IncTimeoutedRequests()
}
}

proxy := &Proxy{
registry: endpointRegistry,
heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10},
}
endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route)

assert.Equal(t, 3, len(endpoints))
assert.Equal(t, hosts[0], endpoints[0].Host)
assert.Equal(t, hosts[1], endpoints[1].Host)
assert.Equal(t, hosts[2], endpoints[2].Host)
}

func TestFilterHealthyEndpointsFiltersUnhealthyEndpoints(t *testing.T) {
hosts := []string{"host1:80", "host2:80", "host3:80"}
endpointRegistry, route := initiliaze(hosts)

for i := range hosts {
metrics := endpointRegistry.GetMetrics(hosts[i])
for j := 0; j < 50; j++ {
metrics.IncTotalRequests()
}
}
for i := 0; i < 30; i++ {
metrics := endpointRegistry.GetMetrics(hosts[0])
metrics.IncTimeoutedRequests()
}

proxy := &Proxy{
registry: endpointRegistry,
heathlyEndpoints: &healthyEndpoints{enabled: true, timeoutedRequestsRatioThreshold: 0.1, totalRequestsThreshold: 10},
}
endpoints := proxy.heathlyEndpoints.filterHealthyEndpoints(route.LBEndpoints, route)

assert.Equal(t, 2, len(endpoints))
assert.Equal(t, hosts[1], endpoints[0].Host)
assert.Equal(t, hosts[2], endpoints[1].Host)
}
89 changes: 86 additions & 3 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ const (
// DefaultExpectContinueTimeout, the default timeout to expect
// a response for a 100 Continue request
DefaultExpectContinueTimeout = 30 * time.Second

// DefaultHealthyEndpointsTotalRequestsThreshold, the default minimum amount of
// requests to the single endpoint for healthy endpoints checker to considering
// opting endpoint out
DefaultHealthyEndpointsTotalRequestsThreshold = 10

// DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold, the default minimum ratio of
// timeouted requests to total requests to the single endpoint for healthy endpoints
// checker to considering opting endpoint out
DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold = 0.1
)

// Flags control the behavior of the proxy.
Expand Down Expand Up @@ -145,6 +155,50 @@ type OpenTracingParams struct {
ExcludeTags []string
}

type PassiveHealthCheckParams struct {
// The minimum number of total requests that should be sent to an endpoint to healthy endpoints checker
// potentially to opt out the endpoint from the list of healthy endpoints
TotalRequestsThreshold int

// The minimum ratio of timed out requests to total requests that should be sent to an endpoint to healthy
// endpoints checker to opt out the endpoint from the list of healthy endpoints
TimedOutRequestsRatioThreshold float64
}

func InitPassiveHealthChecker(o map[string]string) (*PassiveHealthCheckParams, error) {
result := &PassiveHealthCheckParams{
TotalRequestsThreshold: DefaultHealthyEndpointsTotalRequestsThreshold,
TimedOutRequestsRatioThreshold: DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold,
}

for key, value := range o {
switch key {
case "total-requests-threshold":
totalRequestsThreshold, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("passive health check: invalid totalRequestsThreshold value: %s", value)
}
if totalRequestsThreshold < 0 {
return nil, fmt.Errorf("passive health check: invalid totalRequestsThreshold value: %s", value)
}
result.TotalRequestsThreshold = totalRequestsThreshold
case "timed-out-requests-ratio-threshold":
timedOutRequestsRatioThreshold, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("passive health check: invalid timedOutRequestsRatioThreshold value: %s", value)
}
if timedOutRequestsRatioThreshold < 0 || timedOutRequestsRatioThreshold > 1 {
return nil, fmt.Errorf("passive health check: invalid timedOutRequestsRatioThreshold value: %s", value)
}
result.TimedOutRequestsRatioThreshold = timedOutRequestsRatioThreshold
default:
return nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value)
}
}

return result, nil
}

// Proxy initialization options.
type Params struct {
// The proxy expects a routing instance that is used to match
Expand Down Expand Up @@ -250,6 +304,12 @@ type Params struct {
// and returns some metadata about endpoint. Information about the metadata
// returned from the registry could be found in routing.Metrics interface.
EndpointRegistry *routing.EndpointRegistry

// EnablePassiveHealthCheck enables the healthy endpoints checker
EnablePassiveHealthCheck bool

// PassiveHealthCheckParams defines the parameters for the healthy endpoints checker.
PassiveHealthCheck *PassiveHealthCheckParams
}

type (
Expand Down Expand Up @@ -327,6 +387,7 @@ type Proxy struct {
routing *routing.Routing
registry *routing.EndpointRegistry
fadein *fadeIn
heathlyEndpoints *healthyEndpoints
roundTripper http.RoundTripper
priorityRoutes []PriorityRoute
flags Flags
Expand Down Expand Up @@ -470,6 +531,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint {
rt := ctx.route
endpoints := rt.LBEndpoints
endpoints = p.fadein.filterFadeIn(endpoints, rt)
endpoints = p.heathlyEndpoints.filterHealthyEndpoints(endpoints, rt)

lbctx := &routing.LBContext{
Request: ctx.request,
Expand Down Expand Up @@ -722,10 +784,26 @@ func WithParams(p Params) *Proxy {

hostname := os.Getenv("HOSTNAME")

timeoutedRequestsRatioThreshold := DefaultHealthyEndpointsTimeoutedRequestsRatioThreshold
totalRequestsThreshold := DefaultHealthyEndpointsTotalRequestsThreshold
if p.PassiveHealthCheck != nil {
timeoutedRequestsRatioThreshold = p.PassiveHealthCheck.TimedOutRequestsRatioThreshold
totalRequestsThreshold = p.PassiveHealthCheck.TotalRequestsThreshold
}
healthyEndpoints := &healthyEndpoints{
enabled: p.EnablePassiveHealthCheck,
timeoutedRequestsRatioThreshold: timeoutedRequestsRatioThreshold,
totalRequestsThreshold: float64(totalRequestsThreshold),
}

return &Proxy{
routing: p.Routing,
registry: p.EndpointRegistry,
fadein: &fadeIn{rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry},
routing: p.Routing,
registry: p.EndpointRegistry,
fadein: &fadeIn{
rnd: rand.New(loadbalancer.NewLockedSource()),
endpointRegistry: p.EndpointRegistry,
},
heathlyEndpoints: healthyEndpoints,
roundTripper: p.CustomHttpRoundTripperWrap(tr),
priorityRoutes: p.PriorityRoutes,
flags: p.Flags,
Expand Down Expand Up @@ -854,6 +932,8 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
if endpointMetrics != nil {
endpointMetrics.IncInflightRequest()
defer endpointMetrics.DecInflightRequest()

endpointMetrics.IncTotalRequests()
}

if p.experimentalUpgrade && isUpgradeRequest(req) {
Expand All @@ -865,6 +945,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co

roundTripper, err := p.getRoundTripper(ctx, req)
if err != nil {
if endpointMetrics != nil {
endpointMetrics.IncTimeoutedRequests()
}
return nil, &proxyError{err: fmt.Errorf("failed to get roundtripper: %w", err), code: http.StatusBadGateway}
}

Expand Down
Loading

0 comments on commit 56b9fee

Please sign in to comment.