Skip to content

Commit

Permalink
test: add shadow traffic tests (#2649)
Browse files Browse the repository at this point in the history
Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs authored Oct 10, 2023
1 parent 1756b0f commit d7f84a8
Showing 1 changed file with 398 additions and 0 deletions.
398 changes: 398 additions & 0 deletions proxy/shadow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,398 @@
package proxy

import (
"fmt"
"io"
"math/rand"
"net/http"
stdlibhttptest "net/http/httptest"
"os"
"sync/atomic"
"testing"
"time"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/scheduler"
"github.com/zalando/skipper/filters/tee"
"github.com/zalando/skipper/metrics/metricstest"
"github.com/zalando/skipper/net/httptest"
"github.com/zalando/skipper/predicates/primitive"
teepred "github.com/zalando/skipper/predicates/tee"
"github.com/zalando/skipper/predicates/traffic"
"github.com/zalando/skipper/routing"
"github.com/zalando/skipper/routing/testdataclient"
sched "github.com/zalando/skipper/scheduler"
)

func TestShadowSingle(t *testing.T) {
s := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Backend", "main")
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}))
defer s.Close()

shadow := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Backend", "shadow")
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}))
defer shadow.Close()

metrics := &metricstest.MockMetrics{}
reg := sched.RegistryWith(sched.Options{
Metrics: metrics,
EnableRouteFIFOMetrics: true,
})
defer reg.Close()

fr := make(filters.Registry)
fr.Register(tee.NewTeeLoopback())
fr.Register(scheduler.NewFifo())

doc := fmt.Sprintf(`
main: PathSubtree("/")
-> "%s";
split: PathSubtree("/") && Traffic(0.5)
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> "%s";
`, s.URL, s.URL, shadow.URL)

dc, err := testdataclient.NewDoc(doc)
if err != nil {
t.Fatalf("Failed to create testdataclient: %v", err)
}
defer dc.Close()

ro := routing.Options{
SignalFirstLoad: true,
FilterRegistry: fr,
DataClients: []routing.DataClient{dc},
PostProcessors: []routing.PostProcessor{reg},
Predicates: []routing.PredicateSpec{
teepred.New(),
traffic.New(),
primitive.NewTrue(),
},
}
rt := routing.New(ro)
defer rt.Close()
<-rt.FirstLoad()

pr := WithParams(Params{
Routing: rt,
})
defer pr.Close()

ts := stdlibhttptest.NewServer(pr)
defer ts.Close()

N := 1000
for i := 0; i < N; i++ {
rsp, err := ts.Client().Get(ts.URL)
if err != nil {
t.Fatalf("Failed to get response from %s: %v", ts.URL, err)
}
rsp.Body.Close()

if h := rsp.Header.Get("Backend"); h != "main" {
t.Fatalf("wrong response header: %s", h)
}
}

}

func TestShadow(t *testing.T) {
for _, tt := range []struct {
name string
routes string
timeout time.Duration
mainFunc func()
shadowFunc func()
check func(va *httptest.VegetaAttacker) error
debug bool
}{
{
name: "50%",
routes: `
main: PathSubtree("/")
-> "%s";
split: PathSubtree("/") && Traffic(0.5)
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> "%s";
`,
debug: true,
},
{
name: "50% with fifo",
routes: `
main: PathSubtree("/")
-> fifo(10, 5, "1s")
-> "%s";
split: PathSubtree("/") && Traffic(0.5)
-> fifo(10, 5, "1s")
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> fifo(10, 5, "1s")
-> "%s";
`,
debug: true,
},
{
name: "50% with fifo and slow shadow",
routes: `
main: PathSubtree("/")
-> fifo(10, 5, "1s")
-> "%s";
split: PathSubtree("/") && Traffic(0.5)
-> fifo(10, 5, "1s")
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> fifo(10, 5, "1s")
-> "%s";
`,
shadowFunc: func() { time.Sleep(500 * time.Millisecond) },
debug: true,
},
{
name: "50% with fifo and shadow times out",
routes: `
main: PathSubtree("/")
-> fifo(10, 5, "1s")
-> "%s";
split: PathSubtree("/") && Traffic(0.5)
-> fifo(10, 5, "1s")
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> fifo(10, 5, "1s")
-> "%s";
`,
shadowFunc: func() { time.Sleep(1100 * time.Millisecond) },
debug: true,
},
{
name: "50% with fifo and slow main",
routes: `
main: PathSubtree("/")
-> fifo(10, 5, "1s")
-> "%s";
split: PathSubtree("/") && Traffic(0.5)
-> fifo(10, 5, "1s")
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> fifo(10, 5, "1s")
-> "%s";
`,
mainFunc: func() { time.Sleep(50 * time.Millisecond) },
debug: true,
},
{
name: "100% shadow with fifo and 25% timing out main",
routes: `
main: PathSubtree("/")
-> fifo(2, 5, "200ms")
-> "%s";
split: PathSubtree("/") && Traffic(1.0)
-> fifo(2, 5, "200ms")
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> fifo(2, 5, "200ms")
-> "%s";
`,
mainFunc: func() {
if rand.Float64() < 0.25 {
time.Sleep(250 * time.Millisecond)
}
},
debug: true,
check: func(va *httptest.VegetaAttacker) error {
statusFifoFull, _ := va.CountStatus(http.StatusServiceUnavailable)
statusFifoTimeout, _ := va.CountStatus(http.StatusBadGateway)
statusFifoErr, _ := va.CountStatus(http.StatusInternalServerError)

if statusFifoFull == 0 {
return fmt.Errorf("fifo full %d", statusFifoFull)
}
if statusFifoTimeout == 0 {
return fmt.Errorf("fifo timeout %d", statusFifoTimeout)
}
if statusFifoErr != 0 {
return fmt.Errorf("fifo err %d", statusFifoErr)
}
return nil
},
},
{
name: "50% shadow with fifo and 100% timing out main",
routes: `
main: PathSubtree("/")
-> fifo(5, 1, "10ms")
-> "%s";
split: PathSubtree("/") && Traffic(0.5)
-> fifo(5, 1, "10ms")
-> teeLoopback("test")
-> "%s";
shadow: PathSubtree("/") && Tee("test") && True()
-> fifo(5, 1, "10ms")
-> "%s";
`,
mainFunc: func() { time.Sleep(250 * time.Millisecond) },
timeout: 125 * time.Millisecond,
check: func(va *httptest.VegetaAttacker) error {
total := va.TotalRequests()
t.Logf("client observes: total=%d", total)
statusOK, _ := va.CountStatus(http.StatusOK)
statusShadow, _ := va.CountStatus(http.StatusAccepted)
statusClientCancel, _ := va.CountStatus(0)
t.Logf("client observes: main=%d, shadow=%d, clientCancel=%d", statusOK, statusShadow, statusClientCancel)

if statusShadow != 0 {
return fmt.Errorf("client should never get response from shadow, but got %d", statusShadow)
}
if total/2 > uint64(statusClientCancel) {
return fmt.Errorf("most requests should be canceled by client: %d > %d", total-5, statusClientCancel)
}

statusFifoFull, _ := va.CountStatus(http.StatusServiceUnavailable)
statusFifoTimeout, _ := va.CountStatus(http.StatusBadGateway)
statusFifoErr, _ := va.CountStatus(http.StatusInternalServerError)

t.Logf("client observes: statusFifoFull=%d, statusFifoTimeout=%d, statusFifoErr=%d", statusFifoFull, statusFifoTimeout, statusFifoErr)
if statusFifoFull < 2 {
return fmt.Errorf("fifo full %d", statusFifoFull)
}
if statusFifoTimeout < 5 {
return fmt.Errorf("fifo timeout %d", statusFifoTimeout)
}
if statusFifoErr != 0 {
return fmt.Errorf("fifo err %d", statusFifoErr)
}
return nil
},
debug: true,
}} {
t.Run(tt.name, func(t *testing.T) {
if tt.mainFunc == nil {
tt.mainFunc = func() {}
}
if tt.shadowFunc == nil {
tt.shadowFunc = func() {}
}
counterMain := new(atomic.Int64)
counterShadow := new(atomic.Int64)

s := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tt.mainFunc()
counterMain.Add(1)
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}))
defer s.Close()

shadow := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tt.shadowFunc()
counterShadow.Add(1)
w.WriteHeader(http.StatusAccepted)
w.Write([]byte("OK"))
}))
defer shadow.Close()

metrics := &metricstest.MockMetrics{}
reg := sched.RegistryWith(sched.Options{
Metrics: metrics,
EnableRouteFIFOMetrics: true,
})
defer reg.Close()

fr := make(filters.Registry)
fr.Register(tee.NewTeeLoopback())
fr.Register(scheduler.NewFifo())
fr.Register(scheduler.NewLIFO())

doc := fmt.Sprintf(tt.routes, s.URL, s.URL, shadow.URL)

dc, err := testdataclient.NewDoc(doc)
if err != nil {
t.Fatalf("Failed to create testdataclient: %v", err)
}
defer dc.Close()

ro := routing.Options{
SignalFirstLoad: true,
FilterRegistry: fr,
DataClients: []routing.DataClient{dc},
PostProcessors: []routing.PostProcessor{reg},
Predicates: []routing.PredicateSpec{
teepred.New(),
traffic.New(),
primitive.NewTrue(),
},
}
rt := routing.New(ro)
defer rt.Close()
<-rt.FirstLoad()

pr := WithParams(Params{
Routing: rt,
})
defer pr.Close()

ts := stdlibhttptest.NewServer(pr)
defer ts.Close()

rate := 10
duration := 1 * time.Second
per := 100 * time.Millisecond
timeout := 500 * time.Millisecond
if tt.timeout != 0 {
timeout = tt.timeout
}
N := rate * (int(duration / per))

va := httptest.NewVegetaAttacker(ts.URL, rate, per, timeout)
out := io.Discard
if tt.debug {
out = os.Stderr
}
va.Attack(out, duration, "mytest")

t.Logf("backends observe: counter main=%d, counter shadow=%d", counterMain.Load(), counterShadow.Load())

if tt.check == nil {
reqCount := va.TotalRequests()
t.Logf("client observes: total=%d, expected=%d", reqCount, N)
statusOK, _ := va.CountStatus(http.StatusOK)
statusShadow, _ := va.CountStatus(http.StatusAccepted)
statusClientCancel, _ := va.CountStatus(0)
t.Logf("client observes: main=%d, shadow=%d, clientCancel=%d", statusOK, statusShadow, statusClientCancel)
if statusShadow != 0 {
t.Fatalf("Client should never get response from shadow, but got %d", statusShadow)
}

statusFifoFull, _ := va.CountStatus(http.StatusServiceUnavailable)
statusFifoTimeout, _ := va.CountStatus(http.StatusBadGateway)
statusFifoErr, _ := va.CountStatus(http.StatusInternalServerError)
t.Logf("client observes: Qfull=%d, Qtimeout=%d, Qerr=%d", statusFifoFull, statusFifoTimeout, statusFifoErr)
if statusOK != int(reqCount) || statusOK != N {
t.Fatalf("%d != %d or %d != %d", statusOK, reqCount, statusOK, N)
}
if n := counterMain.Load(); int64(N) != n {
t.Fatalf("Failed to get all requests into main expected: %d, got: %d", N, n)
}
} else {
if err := tt.check(va); err != nil {
t.Fatalf("Failed to check: %v", err)
}
}
})
}
}

0 comments on commit d7f84a8

Please sign in to comment.