Skip to content

Commit

Permalink
add ability to cap latency for chaff requests profile
Browse files Browse the repository at this point in the history
  • Loading branch information
mikehelmick committed Nov 12, 2021
1 parent 4569185 commit 7738112
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 19 deletions.
61 changes: 42 additions & 19 deletions tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ const (
// (i.e. this library is falling behind or requests volumes are too large),
// then some individual requests will be dropped.
type Tracker struct {
mu sync.RWMutex
buffer []*request
size int
cap int
pos int
ch chan *request
done chan struct{}
resp Responder
mu sync.RWMutex
buffer []*request
size int
cap int
pos int
ch chan *request
done chan struct{}
resp Responder
maxLatencyMs uint64
}

type request struct {
Expand All @@ -67,18 +68,28 @@ func newRequest(start, end time.Time, headerSize, bodySize uint64) *request {
}

// New creates a new tracker with the `DefaultCapacity`.
func New() *Tracker {
t, _ := NewTracker(&PlainResponder{}, DefaultCapacity)
func New(opts ...Option) *Tracker {
t, _ := NewTracker(&PlainResponder{}, DefaultCapacity, opts...)
return t
}

// Option defines a method for applying options when configuring a new tracker.
type Option func(*Tracker)

// WithMaxLatency puts a cap on the tunnel latency.
func WithMaxLatency(maxLatencyMs uint64) Option {
return func(t *Tracker) {
t.maxLatencyMs = maxLatencyMs
}
}

// NewTracker creates a tracker with custom capacity.
// Launches a goroutine to update the request metrics.
// To shut this down, use the .Close() method.
// The Responder parameter is used to write the output. If non is specified,
// the tracker will default to the "PlainResponder" which just writes the raw
// chaff bytes.
func NewTracker(resp Responder, cap int) (*Tracker, error) {
func NewTracker(resp Responder, cap int, opts ...Option) (*Tracker, error) {
if cap < 1 || cap > DefaultCapacity {
return nil, fmt.Errorf("cap must be 1 <= cap <= 100, got: %v", cap)
}
Expand All @@ -88,14 +99,21 @@ func NewTracker(resp Responder, cap int) (*Tracker, error) {
}

t := &Tracker{
buffer: make([]*request, 0, int(cap)),
size: 0,
cap: cap,
pos: 0,
ch: make(chan *request, cap),
done: make(chan struct{}),
resp: resp,
buffer: make([]*request, 0, int(cap)),
size: 0,
cap: cap,
pos: 0,
ch: make(chan *request, cap),
done: make(chan struct{}),
resp: resp,
maxLatencyMs: 0,
}

// Apply options.
for _, opt := range opts {
opt(t)
}

go t.updater()
return t, nil
}
Expand Down Expand Up @@ -152,8 +170,13 @@ func (t *Tracker) CalculateProfile() *request {
}
divisor := uint64(t.size)

latencyMs := latency / divisor
if max := t.maxLatencyMs; max > 0 && latencyMs > max {
latencyMs = max
}

return &request{
latencyMs: latency / divisor,
latencyMs: latencyMs,
headerSize: uint64(hSize / divisor),
bodySize: uint64(bSize / divisor),
}
Expand Down
49 changes: 49 additions & 0 deletions tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
)

func TestRandomData(t *testing.T) {
t.Parallel()

d := RandomData(0)
if d != "" {
t.Fatalf("expected empty string, got: %q", d)
Expand All @@ -55,6 +58,7 @@ func checkLength(t *testing.T, expected int, length int) {
}

func TestChaff(t *testing.T) {
t.Parallel()
track := New()
defer track.Close()

Expand Down Expand Up @@ -88,6 +92,7 @@ func TestChaff(t *testing.T) {
}

func TestTracking(t *testing.T) {
t.Parallel()
track := New()
defer track.Close()

Expand Down Expand Up @@ -130,7 +135,51 @@ func TestTracking(t *testing.T) {
}
}

func TestMax(t *testing.T) {
t.Parallel()

track := New(WithMaxLatency(25))
defer track.Close()

var wg sync.WaitGroup
for i := 0; i <= DefaultCapacity*2; i++ {
wrapped := track.Track(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(50 * time.Millisecond)
w.WriteHeader(http.StatusAccepted)
w.Header().Add("padding", strings.Repeat("a", i+1))
fmt.Fprintf(w, "%s", strings.Repeat("b", i+1))
}))

recorder := httptest.NewRecorder()
request, err := http.NewRequest("GET", "/", strings.NewReader(""))
if err != nil {
t.Fatalf("http.NewRequest: %v", err)
}

wg.Add(1)
go func(t *testing.T) {
defer wg.Done()
t.Helper()
wrapped.ServeHTTP(recorder, request)
if recorder.Code != http.StatusAccepted {
t.Fatalf("wrong error code: want: %v, got: %v", http.StatusAccepted, recorder.Code)
}
}(t)
}

wg.Wait()

got := track.CalculateProfile()
// Only checking latency
wantLatency := uint64(25)
if diff := cmp.Diff(wantLatency, got.latencyMs); diff != "" {
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
}

func TestJSONMiddleware(t *testing.T) {
t.Parallel()
type result struct {
Name string `json:"name"`
}
Expand Down

0 comments on commit 7738112

Please sign in to comment.