Skip to content

Commit

Permalink
feature: tarpit() filter to let connection to the client open until i…
Browse files Browse the repository at this point in the history
…t closes the connection (#2760)
  • Loading branch information
szuecs authored Dec 6, 2023
1 parent c6deeb8 commit 6f1419f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 0 deletions.
18 changes: 18 additions & 0 deletions docs/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,24 @@ the response path.
Same as [chunks filter](#chunks), but on the request path and not on
the response path.

### tarpit

The tarpit filter discards the request and respond with a never ending
stream of chunked response payloads. The goal is to consume the client
connection without letting the client know what is happening.

Parameters:

* time duration (time.Duration)

Example:

```
* -> tarpit("1s") -> <shunt>;
```

The example will send every second a chunk of response payload.

### absorb

The absorb filter reads and discards the payload of the incoming requests.
Expand Down
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func Filters() []filters.Spec {
diag.NewBackendLatency(),
diag.NewBackendBandwidth(),
diag.NewBackendChunks(),
diag.NewTarpit(),
diag.NewAbsorb(),
diag.NewAbsorbSilent(),
diag.NewLogHeader(),
Expand Down
59 changes: 59 additions & 0 deletions filters/diag/tarpit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package diag

import (
"net/http"
"time"

"github.com/zalando/skipper/filters"
)

type tarpitSpec struct{}

type tarpit struct {
d time.Duration
}

func NewTarpit() filters.Spec {
return &tarpitSpec{}
}

func (t *tarpitSpec) Name() string {
return filters.TarpitName
}

func (t *tarpitSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
if len(args) != 1 {
return nil, filters.ErrInvalidFilterParameters
}
s, ok := args[0].(string)
if !ok {
return nil, filters.ErrInvalidFilterParameters
}

d, err := time.ParseDuration(s)
if err != nil {
return nil, filters.ErrInvalidFilterParameters
}

return &tarpit{d: d}, nil
}

func (t *tarpit) Request(ctx filters.FilterContext) {
ctx.Serve(&http.Response{StatusCode: http.StatusOK, Body: &slowBlockingReader{d: t.d}})
}

func (*tarpit) Response(filters.FilterContext) {}

type slowBlockingReader struct {
d time.Duration
}

func (r *slowBlockingReader) Read(p []byte) (int, error) {
time.Sleep(r.d)
n := copy(p, []byte(" "))
return n, nil
}

func (r *slowBlockingReader) Close() error {
return nil
}
117 changes: 117 additions & 0 deletions filters/diag/tarpit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package diag

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/proxy/proxytest"
)

func TestTarpit(t *testing.T) {
for _, tt := range []struct {
name string
args []interface{}
status int
clientTimeout time.Duration
want error
}{
{
name: "test no args return error",
want: filters.ErrInvalidFilterParameters,
},
{
name: "test wrong arg return error",
args: []interface{}{"no-time-duration"},
want: filters.ErrInvalidFilterParameters,
},
{
name: "test no string arg return error",
args: []interface{}{0x0a},
want: filters.ErrInvalidFilterParameters,
},
{
name: "test wrong number of args return error",
args: []interface{}{"10s", "10ms"},
want: filters.ErrInvalidFilterParameters,
},
{
name: "test 10ms and 1s client timeout",
args: []interface{}{"10ms"},
clientTimeout: time.Second,
want: nil,
},
{
name: "test 1s and 1s client timeout",
args: []interface{}{"1s"},
clientTimeout: time.Second,
want: nil,
},
{
name: "test 1s and 100ms client timeout",
args: []interface{}{"100ms"},
clientTimeout: time.Second,
want: nil,
},
{
name: "test 1s and 3s client timeout",
args: []interface{}{"1s"},
clientTimeout: 3 * time.Second,
want: nil,
}} {
t.Run(tt.name, func(t *testing.T) {
backend := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {

}))
defer backend.Close()

spec := NewTarpit()
_, err := spec.CreateFilter(tt.args)
switch err {
case tt.want:
// ok
if err != nil {
return
}
default:
t.Fatal(err)
}

fr := filters.Registry{}
fr.Register(spec)
sargs := make([]string, 0, len(tt.args))
for _, e := range tt.args {
sargs = append(sargs, e.(string))
}
doc := fmt.Sprintf(`r: * -> tarpit("%s") -> "%s";`, strings.Join(sargs, ","), backend.URL)
r := eskip.MustParse(doc)
p := proxytest.New(fr, r...)
defer p.Close()

N := 1
for i := 0; i < N; i++ {
ctx, done := context.WithTimeout(context.Background(), tt.clientTimeout)
defer done()
req, err := http.NewRequestWithContext(ctx, "GET", p.URL, nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}

rsp, err := p.Client().Do(req)
if err != nil {
t.Fatalf("Failed to get response: %v", err)
}

if rsp.StatusCode != 200 {
t.Fatalf("Failed to get status code 200 got: %d", rsp.StatusCode)
}
}
})
}
}
1 change: 1 addition & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ const (
BackendLatencyName = "backendLatency"
BackendBandwidthName = "backendBandwidth"
BackendChunksName = "backendChunks"
TarpitName = "tarpit"
AbsorbName = "absorb"
AbsorbSilentName = "absorbSilent"
UniformRequestLatencyName = "uniformRequestLatency"
Expand Down

0 comments on commit 6f1419f

Please sign in to comment.