Skip to content

Commit

Permalink
feat(dslx): collect speed samples (#1167)
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone committed Jun 29, 2023
1 parent 7b88651 commit e00ba0e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
11 changes: 11 additions & 0 deletions internal/dslx/httpcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ooni/probe-cli/v3/internal/measurexlite"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/throttling"
)

// HTTPTransport is an HTTP transport bound to a TCP, TLS or QUIC connection
Expand Down Expand Up @@ -306,8 +307,18 @@ func (f *httpRequestFunc) do(
var body []byte
if err == nil {
defer resp.Body.Close()

// create sampler for measuring throttling
sampler := throttling.NewSampler(input.Trace)
defer sampler.Close()

// read a snapshot of the response body
reader := io.LimitReader(resp.Body, maxbody)
body, err = netxlite.ReadAllContext(ctx, reader) // TODO: enable streaming and measure speed

// collect and save download speed samples
samples := sampler.ExtractSamples()
observations[0].NetworkEvents = append(observations[0].NetworkEvents, samples...)
}
finished := input.Trace.TimeSince(input.Trace.ZeroTime)

Expand Down
75 changes: 75 additions & 0 deletions internal/dslx/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package dslx

import (
"context"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/randx"
"github.com/ooni/probe-cli/v3/internal/throttling"
)

func TestMakeSureWeCollectSpeedSamples(t *testing.T) {
const (
chunkSize = 1 << 14
repetitions = 10
totalBody = repetitions * chunkSize
traceID = 14
)

// create a testing server that sleeps after each send for a given number of sends
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
chunk := []byte(randx.Letters(chunkSize))
for idx := 0; idx < repetitions; idx++ {
w.Write(chunk)
time.Sleep(250 * time.Millisecond)
}
}))
defer server.Close()

// instantiate a connection pool
pool := &ConnPool{}
defer pool.Close()

// create a measuring function
f0 := Compose3(
TCPConnect(pool),
HTTPTransportTCP(),
HTTPRequest(),
)

// create the endpoint to measure
epnt := &Endpoint{
Address: server.Listener.Addr().String(),
Domain: "",
IDGenerator: &atomic.Int64{},
Logger: model.DiscardLogger,
Network: "tcp",
Tags: []string{},
ZeroTime: time.Now(),
}

// measure the endpoint
result := f0.Apply(context.Background(), epnt)

// get observations
observations := ExtractObservations(result)

// process the network events and check for summary
var foundSummary bool
for _, entry := range observations {
for _, ev := range entry.NetworkEvents {
if ev.Operation == throttling.BytesReceivedCumulativeOperation {
t.Log(ev)
foundSummary = true
}
}
}
if !foundSummary {
t.Fatal("did not find the summary")
}
}
2 changes: 1 addition & 1 deletion internal/throttling/throttling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestSamplerWorkingAsIntended(t *testing.T) {
traceID = 14
)

// create a testing server that sleeps after each sender for a given number of sends
// create a testing server that sleeps after each send for a given number of sends
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
chunk := []byte(randx.Letters(chunkSize))
for idx := 0; idx < repetitions; idx++ {
Expand Down

0 comments on commit e00ba0e

Please sign in to comment.