Skip to content

Commit

Permalink
fix: inode admission controllers should not count shedded responses b…
Browse files Browse the repository at this point in the history
…y other admission controllers (#2443)

test: create test that check the leaf admission controllers will not interfere with inode admission controllers in the call tree
test: fix flaky test (works on my machine)

Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs authored Aug 14, 2023
1 parent a241b5f commit e0dd155
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 11 deletions.
25 changes: 18 additions & 7 deletions filters/shedder/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ func (m mode) String() string {
}

const (
counterPrefix = "shedder.admission_control."
admissionControlSpanName = "admission_control"
admissionControlKey = "shedder:admission_control"
admissionControlValue = "reject"
minWindowSize = 1
maxWindowSize = 100
counterPrefix = "shedder.admission_control."
admissionControlSpanName = "admission_control"
admissionSignalHeaderKey = "Admission-Control"
admissionSignalHeaderValue = "true"
admissionControlKey = "shedder:admission_control"
admissionControlValue = "reject"
minWindowSize = 1
maxWindowSize = 100
)

type Options struct {
Expand Down Expand Up @@ -422,18 +424,27 @@ func (ac *admissionControl) Request(ctx filters.FilterContext) {
if ac.mode != active {
return
}

header := make(http.Header)
header.Set(admissionSignalHeaderKey, admissionSignalHeaderValue)
ctx.Serve(&http.Response{
Header: header,
StatusCode: http.StatusServiceUnavailable,
})
}
}

func (ac *admissionControl) Response(ctx filters.FilterContext) {
// we don't want to count ourselves
// we don't want to count our short cutted responses as errors
if ctx.StateBag()[admissionControlKey] == admissionControlValue {
return
}

// we don't want to count other shedders in the call path as errors
if ctx.Response().Header.Get(admissionSignalHeaderKey) == admissionSignalHeaderValue {
return
}

if ctx.Response().StatusCode < 499 {
ac.successCounter.Add(1)
}
Expand Down
161 changes: 157 additions & 4 deletions filters/shedder/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,30 @@ package shedder

import (
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"sort"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/builtin"
"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/metrics/metricstest"
"github.com/zalando/skipper/net"
"github.com/zalando/skipper/proxy/proxytest"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/routing/testdataclient"
"golang.org/x/time/rate"
)

func TestAdmissionControl(t *testing.T) {
// fixed rand to have non flaky tests
rand := rand.New(rand.NewSource(5))

for _, ti := range []struct {
msg string
mode string
Expand Down Expand Up @@ -92,7 +97,7 @@ func TestAdmissionControl(t *testing.T) {
exponent: 1.0,
N: 20,
pBackendErr: 0.2,
pExpectedAdmissionShedding: 0.1,
pExpectedAdmissionShedding: 0.15,
}, {
msg: "medium error rate and bigger than threshold will block some traffic",
mode: "active",
Expand All @@ -117,10 +122,37 @@ func TestAdmissionControl(t *testing.T) {
N: 20,
pBackendErr: 0.8,
pExpectedAdmissionShedding: 0.91,
}, {
msg: "inactive mode with large error rate and bigger than threshold will pass all traffic",
mode: "inactive",
d: 10 * time.Millisecond,
windowsize: 5,
minRequests: 10,
successThreshold: 0.9,
maxrejectprobability: 0.95,
exponent: 1.0,
N: 20,
pBackendErr: 0.8,
pExpectedAdmissionShedding: 0.0,
}, {
msg: "logInactive mode with large error rate and bigger than threshold will pass all traffic",
mode: "logInactive",
d: 10 * time.Millisecond,
windowsize: 5,
minRequests: 10,
successThreshold: 0.9,
maxrejectprobability: 0.95,
exponent: 1.0,
N: 20,
pBackendErr: 0.8,
pExpectedAdmissionShedding: 0.0,
}} {
t.Run(ti.msg, func(t *testing.T) {
var mu sync.Mutex
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
p := rand.Float64()
mu.Unlock()
if p < ti.pBackendErr {
w.WriteHeader(http.StatusInternalServerError)
} else {
Expand Down Expand Up @@ -208,6 +240,127 @@ func TestAdmissionControl(t *testing.T) {
}
}

func TestAdmissionControlChainOnlyBackendErrorPass(t *testing.T) {
dm := metrics.Default
t.Cleanup(func() { metrics.Default = dm })
m := &metricstest.MockMetrics{}
metrics.Default = m
defer m.Close()

// backend with 50% error rate
var cnt int64
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
i := atomic.AddInt64(&cnt, 1)
if i%2 == 0 {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
}))
defer backend.Close()

spec := NewAdmissionControl(Options{}).(*AdmissionControlSpec)

argsLeaf := make([]interface{}, 0, 6)
argsLeaf = append(argsLeaf, "testmetric-leaf", "active", "5ms", 5, 5, 0.9, 0.95, 1.0)
fLeaf, err := spec.CreateFilter(argsLeaf)
if err != nil {
t.Fatalf("error creating leaf filter %q: %v", argsLeaf, err)
return
}
defer fLeaf.(*admissionControl).Close()

args := make([]interface{}, 0, 6)
args = append(args, "testmetric", "active", "50ms", 10, 10, 0.9, 0.95, 1.0)
f, err := spec.CreateFilter(args)
if err != nil {
t.Fatalf("error creating filter %q: %v", args, err)
return
}
defer f.(*admissionControl).Close()

fr := make(filters.Registry)
fr.Register(spec)

rLeaf := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: argsLeaf}}, Backend: backend.URL}
proxyLeaf := proxytest.New(fr, rLeaf)
defer proxyLeaf.Close()

r := &eskip.Route{Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, Backend: proxyLeaf.URL}
proxy := proxytest.New(fr, r)
defer proxy.Close()

reqURL, err := url.Parse(proxy.URL)
if err != nil {
t.Fatalf("Failed to parse url %s: %v", proxy.URL, err)
}

rt := net.NewTransport(net.Options{})
defer rt.Close()

req, err := http.NewRequest("GET", reqURL.String(), nil)
if err != nil {
t.Error(err)
return
}

N := 1000
var wg sync.WaitGroup
wg.Add(N)
lim := rate.NewLimiter(50, 50)
now := time.Now()
for i := 0; i < N; i++ {
r := lim.Reserve()
for !r.OK() {
time.Sleep(10 * time.Microsecond)
}
go func(i int) {
defer r.Cancel()
defer wg.Done()
rsp, err := rt.RoundTrip(req)
if err != nil {
t.Logf("roundtrip %d: %v", i, err)
} else {
io.Copy(io.Discard, rsp.Body)
rsp.Body.Close()
}
}(i)
}
wg.Wait()
t.Logf("test took: %s", time.Since(now))

var total, totalLeaf, rejects, rejectsLeaf int64
m.WithCounters(func(counters map[string]int64) {
t.Logf("counters: %+v", counters)
var ok bool

total, ok = counters["shedder.admission_control.total.testmetric"]
if !ok {
t.Error("Failed to find total shedder data")
}

rejectsLeaf, ok = counters["shedder.admission_control.reject.testmetric-leaf"]
if !ok {
t.Error("Failed to find rejectsLeaf shedder data")
}

totalLeaf, ok = counters["shedder.admission_control.total.testmetric-leaf"]
if !ok {
t.Error("Failed to find totalLeaf shedder data")
}
})

t.Logf("N: %d", N)
t.Logf("totalLeaf: %d, rejectLeaf: %d", totalLeaf, rejectsLeaf)
t.Logf("total: %d, reject: %d", total, rejects)

epsilon := 0.05
maxExpectedFails := int64(epsilon * float64(N)) // maybe 5% instead of 0.1%
if rejects > maxExpectedFails {
t.Fatalf("Failed to get less rejects as wanted: %d > %d", rejects, maxExpectedFails)
}
}

func TestAdmissionControlCleanupModes(t *testing.T) {
for _, ti := range []struct {
msg string
Expand Down

0 comments on commit e0dd155

Please sign in to comment.