From e0dd15592c46056570bf9ada5f37f95eb6dc2984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Mon, 14 Aug 2023 17:51:53 +0200 Subject: [PATCH] fix: inode admission controllers should not count shedded responses by other admission controllers (#2443) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test: create test that check the leaf admission controllers will not interfere with inode admission controllers in the call tree test: fix flaky test (works on my machine) Signed-off-by: Sandor Szücs --- filters/shedder/admission.go | 25 +++-- filters/shedder/admission_test.go | 161 +++++++++++++++++++++++++++++- 2 files changed, 175 insertions(+), 11 deletions(-) diff --git a/filters/shedder/admission.go b/filters/shedder/admission.go index d884522aa6..c963330d8c 100644 --- a/filters/shedder/admission.go +++ b/filters/shedder/admission.go @@ -83,12 +83,14 @@ func (m mode) String() string { } const ( - counterPrefix = "shedder.admission_control." - admissionControlSpanName = "admission_control" - admissionControlKey = "shedder:admission_control" - admissionControlValue = "reject" - minWindowSize = 1 - maxWindowSize = 100 + counterPrefix = "shedder.admission_control." + admissionControlSpanName = "admission_control" + admissionSignalHeaderKey = "Admission-Control" + admissionSignalHeaderValue = "true" + admissionControlKey = "shedder:admission_control" + admissionControlValue = "reject" + minWindowSize = 1 + maxWindowSize = 100 ) type Options struct { @@ -422,18 +424,27 @@ func (ac *admissionControl) Request(ctx filters.FilterContext) { if ac.mode != active { return } + + header := make(http.Header) + header.Set(admissionSignalHeaderKey, admissionSignalHeaderValue) ctx.Serve(&http.Response{ + Header: header, StatusCode: http.StatusServiceUnavailable, }) } } func (ac *admissionControl) Response(ctx filters.FilterContext) { - // we don't want to count ourselves + // we don't want to count our short cutted responses as errors if ctx.StateBag()[admissionControlKey] == admissionControlValue { return } + // we don't want to count other shedders in the call path as errors + if ctx.Response().Header.Get(admissionSignalHeaderKey) == admissionSignalHeaderValue { + return + } + if ctx.Response().StatusCode < 499 { ac.successCounter.Add(1) } diff --git a/filters/shedder/admission_test.go b/filters/shedder/admission_test.go index 8066261111..0377feedf7 100644 --- a/filters/shedder/admission_test.go +++ b/filters/shedder/admission_test.go @@ -2,25 +2,30 @@ package shedder import ( "fmt" + "io" "math/rand" "net/http" "net/http/httptest" + "net/url" "sort" + "sync" + "sync/atomic" "testing" "time" "github.com/zalando/skipper/eskip" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/filters/builtin" + "github.com/zalando/skipper/metrics" + "github.com/zalando/skipper/metrics/metricstest" + "github.com/zalando/skipper/net" "github.com/zalando/skipper/proxy/proxytest" "github.com/zalando/skipper/routing" "github.com/zalando/skipper/routing/testdataclient" + "golang.org/x/time/rate" ) func TestAdmissionControl(t *testing.T) { - // fixed rand to have non flaky tests - rand := rand.New(rand.NewSource(5)) - for _, ti := range []struct { msg string mode string @@ -92,7 +97,7 @@ func TestAdmissionControl(t *testing.T) { exponent: 1.0, N: 20, pBackendErr: 0.2, - pExpectedAdmissionShedding: 0.1, + pExpectedAdmissionShedding: 0.15, }, { msg: "medium error rate and bigger than threshold will block some traffic", mode: "active", @@ -117,10 +122,37 @@ func TestAdmissionControl(t *testing.T) { N: 20, pBackendErr: 0.8, pExpectedAdmissionShedding: 0.91, + }, { + msg: "inactive mode with large error rate and bigger than threshold will pass all traffic", + mode: "inactive", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 20, + pBackendErr: 0.8, + pExpectedAdmissionShedding: 0.0, + }, { + msg: "logInactive mode with large error rate and bigger than threshold will pass all traffic", + mode: "logInactive", + d: 10 * time.Millisecond, + windowsize: 5, + minRequests: 10, + successThreshold: 0.9, + maxrejectprobability: 0.95, + exponent: 1.0, + N: 20, + pBackendErr: 0.8, + pExpectedAdmissionShedding: 0.0, }} { t.Run(ti.msg, func(t *testing.T) { + var mu sync.Mutex backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() p := rand.Float64() + mu.Unlock() if p < ti.pBackendErr { w.WriteHeader(http.StatusInternalServerError) } else { @@ -208,6 +240,127 @@ func TestAdmissionControl(t *testing.T) { } } +func TestAdmissionControlChainOnlyBackendErrorPass(t *testing.T) { + dm := metrics.Default + t.Cleanup(func() { metrics.Default = dm }) + m := &metricstest.MockMetrics{} + metrics.Default = m + defer m.Close() + + // backend with 50% error rate + var cnt int64 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + i := atomic.AddInt64(&cnt, 1) + if i%2 == 0 { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + })) + defer backend.Close() + + spec := NewAdmissionControl(Options{}).(*AdmissionControlSpec) + + argsLeaf := make([]interface{}, 0, 6) + argsLeaf = append(argsLeaf, "testmetric-leaf", "active", "5ms", 5, 5, 0.9, 0.95, 1.0) + fLeaf, err := spec.CreateFilter(argsLeaf) + if err != nil { + t.Fatalf("error creating leaf filter %q: %v", argsLeaf, err) + return + } + defer fLeaf.(*admissionControl).Close() + + args := make([]interface{}, 0, 6) + args = append(args, "testmetric", "active", "50ms", 10, 10, 0.9, 0.95, 1.0) + f, err := spec.CreateFilter(args) + if err != nil { + t.Fatalf("error creating filter %q: %v", args, err) + return + } + defer f.(*admissionControl).Close() + + fr := make(filters.Registry) + fr.Register(spec) + + rLeaf := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: argsLeaf}}, Backend: backend.URL} + proxyLeaf := proxytest.New(fr, rLeaf) + defer proxyLeaf.Close() + + r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: proxyLeaf.URL} + proxy := proxytest.New(fr, r) + defer proxy.Close() + + reqURL, err := url.Parse(proxy.URL) + if err != nil { + t.Fatalf("Failed to parse url %s: %v", proxy.URL, err) + } + + rt := net.NewTransport(net.Options{}) + defer rt.Close() + + req, err := http.NewRequest("GET", reqURL.String(), nil) + if err != nil { + t.Error(err) + return + } + + N := 1000 + var wg sync.WaitGroup + wg.Add(N) + lim := rate.NewLimiter(50, 50) + now := time.Now() + for i := 0; i < N; i++ { + r := lim.Reserve() + for !r.OK() { + time.Sleep(10 * time.Microsecond) + } + go func(i int) { + defer r.Cancel() + defer wg.Done() + rsp, err := rt.RoundTrip(req) + if err != nil { + t.Logf("roundtrip %d: %v", i, err) + } else { + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + } + }(i) + } + wg.Wait() + t.Logf("test took: %s", time.Since(now)) + + var total, totalLeaf, rejects, rejectsLeaf int64 + m.WithCounters(func(counters map[string]int64) { + t.Logf("counters: %+v", counters) + var ok bool + + total, ok = counters["shedder.admission_control.total.testmetric"] + if !ok { + t.Error("Failed to find total shedder data") + } + + rejectsLeaf, ok = counters["shedder.admission_control.reject.testmetric-leaf"] + if !ok { + t.Error("Failed to find rejectsLeaf shedder data") + } + + totalLeaf, ok = counters["shedder.admission_control.total.testmetric-leaf"] + if !ok { + t.Error("Failed to find totalLeaf shedder data") + } + }) + + t.Logf("N: %d", N) + t.Logf("totalLeaf: %d, rejectLeaf: %d", totalLeaf, rejectsLeaf) + t.Logf("total: %d, reject: %d", total, rejects) + + epsilon := 0.05 + maxExpectedFails := int64(epsilon * float64(N)) // maybe 5% instead of 0.1% + if rejects > maxExpectedFails { + t.Fatalf("Failed to get less rejects as wanted: %d > %d", rejects, maxExpectedFails) + } +} + func TestAdmissionControlCleanupModes(t *testing.T) { for _, ti := range []struct { msg string