Skip to content

Commit

Permalink
feat: lightweight throttling measurements (#1166)
Browse files Browse the repository at this point in the history
This diff implements a lightweight approach to throttling that takes
advantage of the step-by-step design and should also be suitable to
measure throttling using `dslx` (see
ooni/probe#2493).

Before discussing the approach implemented here, it is important to
point out that:

1. if we're using step-by-step, we're collecting up to 64 network events
for a single network connection;

2. with step-by-step, each trace is bound to a single network connection
or DNS round trip;

3. both Web Connectivity v0.5 and dslx use the step-by-step approach;

4. therefore, for extreme throttling of a single connection, 64 I/O
events are *a lot of events* to observe throttling;

5. additionally, we're currently limited at downloading `1<<19` bytes of
the body, so there is not much room for collecting *lots of data*
anyway;

6. additionally, if we were to collect more bytes, the bottleneck would
become collecting and uploading the HTTP response body to the OONI
backend.

That said, by exploiting the fact that step-by-step means that a trace
is bound to a single network connection, we can add passive atomic
collection of the bytes received by a trace. Because we're dealing with
unconnected UDP sockets, we also need to be careful about accounting the
bytes received from the peer that sent the bytes. To this end, we
maintain a map from the remote endpoint address and protocol to the
number of bytes received. The trace allows one to export the current
map. Because data collection is passive, we can start as late as the
HTTP download and we would still collect correct cumulative data.

We also introduce a new sampler for measuring throttling. The design of
the sampler is similar to the design we're using inside of ndt7. We use
a memoryless ticker to avoid sampling periodically but we clamp the
distribution such that we will typically receive the expected amount of
samplers for each time period.

It is also worth noting that I believe the already collected 64 network
events are fine to determine throttling, but we cannot know for sure,
hence it makes sense to improve our data collection capabilities.

The related spec PR is ooni/spec#276.

Once this diff is merged, we would still need to do the following:

- [ ] update dslx to use this functionality
- [ ] land Web Connectivity LTE

The latter is fundamental to collect speed samples. We're not doing that
with Web Connectivity v0.4.

While there, this diff also improves the measurexlite documentation a
bit.
  • Loading branch information
bassosimone committed Jun 29, 2023
1 parent 1428fb1 commit 7b88651
Show file tree
Hide file tree
Showing 11 changed files with 547 additions and 48 deletions.
8 changes: 8 additions & 0 deletions internal/experiment/webconnectivitylte/cleartextflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ooni/probe-cli/v3/internal/measurexlite"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/throttling"
)

// Measures HTTP endpoints.
Expand Down Expand Up @@ -98,6 +99,13 @@ func (t *CleartextFlow) Run(parentCtx context.Context, index int64) error {
// create trace
trace := measurexlite.NewTrace(index, t.ZeroTime)

// start measuring throttling
sampler := throttling.NewSampler(trace)
defer func() {
t.TestKeys.AppendNetworkEvents(sampler.ExtractSamples()...)
sampler.Close()
}()

// start the operation logger
ol := measurexlite.NewOperationLogger(
t.Logger, "[#%d] GET http://%s using %s", index, t.HostHeader, t.Address,
Expand Down
8 changes: 8 additions & 0 deletions internal/experiment/webconnectivitylte/secureflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ooni/probe-cli/v3/internal/measurexlite"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/throttling"
)

// Measures HTTPS endpoints.
Expand Down Expand Up @@ -105,6 +106,13 @@ func (t *SecureFlow) Run(parentCtx context.Context, index int64) error {
// create trace
trace := measurexlite.NewTrace(index, t.ZeroTime)

// start measuring throttling
sampler := throttling.NewSampler(trace)
defer func() {
t.TestKeys.AppendNetworkEvents(sampler.ExtractSamples()...)
sampler.Close()
}()

// start the operation logger
ol := measurexlite.NewOperationLogger(
t.Logger, "[#%d] GET https://%s using %s", index, t.HostHeader, t.Address,
Expand Down
60 changes: 58 additions & 2 deletions internal/measurexlite/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ package measurexlite
//

import (
"fmt"
"net"
"time"

"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)

// MaybeClose is a convenience function for closing a conn only when such a conn isn't nil.
// MaybeClose is a convenience function for closing a [net.Conn] when it is not nil.
func MaybeClose(conn net.Conn) (err error) {
if conn != nil {
err = conn.Close()
Expand Down Expand Up @@ -40,12 +41,15 @@ var _ net.Conn = &connTrace{}

// Read implements net.Conn.Read and saves network events.
func (c *connTrace) Read(b []byte) (int, error) {
// collect preliminary stats when the connection is surely active
network := c.RemoteAddr().Network()
addr := c.RemoteAddr().String()
started := c.tx.TimeSince(c.tx.ZeroTime)

// perform the underlying network operation
count, err := c.Conn.Read(b)

// emit the network event
finished := c.tx.TimeSince(c.tx.ZeroTime)
select {
case c.tx.networkEvent <- NewArchivalNetworkEvent(
Expand All @@ -54,9 +58,45 @@ func (c *connTrace) Read(b []byte) (int, error) {
default: // buffer is full
}

// update per receiver statistics
c.tx.updateBytesReceivedMapNetConn(network, addr, count)

// return to the caller
return count, err
}

// updateBytesReceivedMapNetConn updates the [*Trace] bytes received map for a [net.Conn].
func (tx *Trace) updateBytesReceivedMapNetConn(network, address string, count int) {
// normalize the network name
switch network {
case "udp", "udp4", "udp6":
network = "udp"
case "tcp", "tcp4", "tcp6":
network = "tcp"
}

// create the key for inserting inside the map
key := fmt.Sprintf("%s %s", address, network)

// lock and insert into the map
tx.bytesReceivedMu.Lock()
tx.bytesReceivedMap[key] += int64(count)
tx.bytesReceivedMu.Unlock()
}

// CloneBytesReceivedMap returns a clone of the internal bytes received map. The key
// of the map is a string following the "EPNT_ADDRESS PROTO" pattern where the "EPNT_ADDRESS"
// contains the endpoint address and "PROTO" is "tcp" or "udp".
func (tx *Trace) CloneBytesReceivedMap() (out map[string]int64) {
out = make(map[string]int64)
tx.bytesReceivedMu.Lock()
for key, value := range tx.bytesReceivedMap {
out[key] = value
}
tx.bytesReceivedMu.Unlock()
return
}

// Write implements net.Conn.Write and saves network events.
func (c *connTrace) Write(b []byte) (int, error) {
network := c.RemoteAddr().Network()
Expand All @@ -76,7 +116,7 @@ func (c *connTrace) Write(b []byte) (int, error) {
return count, err
}

// MaybeUDPLikeClose is a convenience function for closing a conn only when such a conn isn't nil.
// MaybeCloseUDPLikeConn is a convenience function for closing a [model.UDPLikeConn] when it is not nil.
func MaybeCloseUDPLikeConn(conn model.UDPLikeConn) (err error) {
if conn != nil {
err = conn.Close()
Expand All @@ -102,10 +142,13 @@ type udpLikeConnTrace struct {

// Read implements model.UDPLikeConn.ReadFrom and saves network events.
func (c *udpLikeConnTrace) ReadFrom(b []byte) (int, net.Addr, error) {
// record when we started measuring
started := c.tx.TimeSince(c.tx.ZeroTime)

// perform the network operation
count, addr, err := c.UDPLikeConn.ReadFrom(b)

// emit the network event
finished := c.tx.TimeSince(c.tx.ZeroTime)
address := addrStringIfNotNil(addr)
select {
Expand All @@ -115,9 +158,22 @@ func (c *udpLikeConnTrace) ReadFrom(b []byte) (int, net.Addr, error) {
default: // buffer is full
}

// possibly collect a download speed sample
c.tx.maybeUpdateBytesReceivedMapUDPLikeConn(addr, count)

// return results to the caller
return count, addr, err
}

// maybeUpdateBytesReceivedMapUDPLikeConn updates the [*Trace] bytes received map for a [model.UDPLikeConn].
func (tx *Trace) maybeUpdateBytesReceivedMapUDPLikeConn(addr net.Addr, count int) {
// Implementation note: the address may be nil if the operation failed given that we don't
// have a fixed peer address for UDP connections
if addr != nil {
tx.updateBytesReceivedMapNetConn(addr.Network(), addr.String(), count)
}
}

// Write implements model.UDPLikeConn.WriteTo and saves network events.
func (c *udpLikeConnTrace) WriteTo(b []byte, addr net.Addr) (int, error) {
started := c.tx.TimeSince(c.tx.ZeroTime)
Expand Down
91 changes: 90 additions & 1 deletion internal/measurexlite/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ func TestWrapNetConn(t *testing.T) {
if err != nil {
t.Fatal("invalid err")
}

t.Run("we update the trace's byte received map", func(t *testing.T) {
stats := trace.CloneBytesReceivedMap()
if len(stats) != 1 {
t.Fatal("expected to see just one entry")
}
if stats["1.1.1.1:443 tcp"] != 128 {
t.Fatal("expected to know we received 128 bytes")
}
})

events := trace.NetworkEvents()
if len(events) != 1 {
t.Fatal("did not save network events")
Expand Down Expand Up @@ -264,6 +275,9 @@ func TestWrapUDPLikeConn(t *testing.T) {
MockString: func() string {
return "1.1.1.1:443"
},
MockNetwork: func() string {
return "udp"
},
}, nil
},
}
Expand All @@ -284,6 +298,17 @@ func TestWrapUDPLikeConn(t *testing.T) {
if err != nil {
t.Fatal("invalid err")
}

t.Run("we update the trace's byte received map", func(t *testing.T) {
stats := trace.CloneBytesReceivedMap()
if len(stats) != 1 {
t.Fatal("expected to see just one entry")
}
if stats["1.1.1.1:443 udp"] != 128 {
t.Fatal("expected to know we received 128 bytes")
}
})

events := trace.NetworkEvents()
if len(events) != 1 {
t.Fatal("did not save network events")
Expand All @@ -310,6 +335,9 @@ func TestWrapUDPLikeConn(t *testing.T) {
MockString: func() string {
return "1.1.1.1:443"
},
MockNetwork: func() string {
return "udp"
},
}, nil
},
}
Expand Down Expand Up @@ -379,7 +407,7 @@ func TestWrapUDPLikeConn(t *testing.T) {
}
})

t.Run("Write discards the event when the buffer is full", func(t *testing.T) {
t.Run("WriteTo discards the event when the buffer is full", func(t *testing.T) {
underlying := &mocks.UDPLikeConn{
MockWriteTo: func(b []byte, addr net.Addr) (int, error) {
return len(b), nil
Expand Down Expand Up @@ -477,3 +505,64 @@ func TestNewAnnotationArchivalNetworkEvent(t *testing.T) {
t.Fatal(diff)
}
}

func TestTrace_updateBytesReceivedMapNetConn(t *testing.T) {
t.Run("we handle tcp4, tcp6, udp4 and udp6 like they were tcp and udp", func(t *testing.T) {
// create a new trace
tx := NewTrace(0, time.Now())

// insert stats for tcp, tcp4 and tcp6
tx.updateBytesReceivedMapNetConn("tcp", "1.2.3.4:5678", 10)
tx.updateBytesReceivedMapNetConn("tcp4", "1.2.3.4:5678", 100)
tx.updateBytesReceivedMapNetConn("tcp", "[::1]:5678", 10)
tx.updateBytesReceivedMapNetConn("tcp6", "[::1]:5678", 100)

// insert stats for udp, udp4 and udp6
tx.updateBytesReceivedMapNetConn("udp", "1.2.3.4:5678", 10)
tx.updateBytesReceivedMapNetConn("udp4", "1.2.3.4:5678", 100)
tx.updateBytesReceivedMapNetConn("udp", "[::1]:5678", 10)
tx.updateBytesReceivedMapNetConn("udp6", "[::1]:5678", 100)

// make sure the result is the expected one
expected := map[string]int64{
"1.2.3.4:5678 tcp": 110,
"[::1]:5678 tcp": 110,
"1.2.3.4:5678 udp": 110,
"[::1]:5678 udp": 110,
}
got := tx.CloneBytesReceivedMap()
if diff := cmp.Diff(expected, got); diff != "" {
t.Fatal(diff)
}
})
}

func TestTrace_maybeUpdateBytesReceivedMapUDPLikeConn(t *testing.T) {
t.Run("we ignore cases where the address is nil", func(t *testing.T) {
// create a new trace
tx := NewTrace(0, time.Now())

// insert stats with a nil address
tx.maybeUpdateBytesReceivedMapUDPLikeConn(nil, 128)

// inserts stats with a good address
goodAddr := &mocks.Addr{
MockString: func() string {
return "1.2.3.4:5678"
},
MockNetwork: func() string {
return "udp"
},
}
tx.maybeUpdateBytesReceivedMapUDPLikeConn(goodAddr, 128)

// make sure the result is the expected one
expected := map[string]int64{
"1.2.3.4:5678 udp": 128,
}
got := tx.CloneBytesReceivedMap()
if diff := cmp.Diff(expected, got); diff != "" {
t.Fatal(diff)
}
})
}
4 changes: 0 additions & 4 deletions internal/measurexlite/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (

// NewDialerWithoutResolver is equivalent to netxlite.NewDialerWithoutResolver
// except that it returns a model.Dialer that uses this trace.
//
// Note: unlike code in netx or measurex, this factory DOES NOT return you a
// dialer that also performs wrapping of a net.Conn in case of success. If you
// want to wrap the conn, you need to wrap it explicitly using model.Trace.WrapNetConn.
func (tx *Trace) NewDialerWithoutResolver(dl model.DebugLogger) model.Dialer {
return &dialerTrace{
d: tx.newDialerWithoutResolver(dl),
Expand Down
5 changes: 3 additions & 2 deletions internal/measurexlite/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ func (tx *Trace) FirstDNSLookup() *model.ArchivalDNSLookupResult {
return ev[0]
}

// ErrDelayedDNSResponseBufferFull indicates that the delayedDNSResponse buffer is full.
var ErrDelayedDNSResponseBufferFull = errors.New("buffer full")
// ErrDelayedDNSResponseBufferFull indicates that the delayed-DNS-response channel buffer is full.
var ErrDelayedDNSResponseBufferFull = errors.New(
"measurexlite: the delayed-DNS-response channel buffer is full")

// OnDelayedDNSResponse implements model.Trace.OnDelayedDNSResponse
func (tx *Trace) OnDelayedDNSResponse(started time.Time, txp model.DNSTransport, query model.DNSQuery,
Expand Down
48 changes: 41 additions & 7 deletions internal/measurexlite/doc.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,44 @@
// Package measurexlite contains measurement extensions.
// Package measurexlite contains measurement extensions. This package is named "measurex lite"
// because it implements a lightweight approach compared to a previous package named "measurex".
//
// See docs/design/dd-003-step-by-step.md in the ooni/probe-cli
// repository for the design document.
// [measurexlite] implements the [dd-003-step-by-step.md] design document. The fundamental data type
// is the [*Trace], which saves events in buffered channels. The [NewTrace] constructor creates
// channels with sufficient capacity for tracing all the events we expect to see for a single use
// connection or for a DNS round trip. If you are not draining the channels, the [*Trace] will
// eventually stop collecting events, though.
//
// This implementation features a Trace that saves events in
// buffered channels as proposed by df-003-step-by-step.md. We
// have reasonable default buffers for channels. But, if you
// are not draining them, eventually we stop collecting events.
// As mentioned above, the expectation is that a [*Trace] will only trace a single use connection or
// a DNS round trip. Typically, you create a distinct trace for each TCP-TLS-HTTP or TCP-HTTP or
// QUIC-HTTP or DNS-lookup-with-getaddrinfo or DNS-lookup-with-UDP sequence of operations. There is
// a "trace ID" for each trace, which you provide to [NewTrace]. This ID is copied into the
// "transaction_id" field of the archival network events. Therefore, by using distinct trace IDs
// for distinct operations, you enable [ooni/data] to group related events together.
//
// The [*Trace] features methods that mirror existing [netxlite] methods but implement support for
// collecting network events using the [*Trace]. For example, [*Trace.NewStdlibResolver] is like
// [netxlite.NewStdlibResolver] but the DNS lookups performed with the resolved returned by
// [*Trace.NewStdlibResolver] generate events that you can collect using the [*Trace].
//
// As mentioned above, internally, the [*Trace] uses buffered channels on which the underlying
// network objects attempt to write when there is an interesting event. As a user of the
// [measurexlite] package, you have methods to extract the events from the [*Trace] channels,
// such as, for example:
//
// - [*Trace.DNSLookupsFromRoundTrip]
//
// - [*Trace.NetworkEvents]
//
// - [*Trace.TCPConnects]
//
// - [*Trace.QUICHandshakes]
//
// - [*Trace.TLSHandshakes]
//
// These methods already return data structures using the archival data format implemented
// by the [model] package and specified in the [ooni/spec] repository. Hence, these structures
// are ready to be added to OONI measurements.
//
// [dd-003-step-by-step.md]: https://github.com/ooni/probe-cli/blob/master/docs/design/dd-003-step-by-step.md
// [ooni/data]: https://github.com/ooni/data
// [ooni/spec]: https://github.com/ooni/spec
package measurexlite
3 changes: 1 addition & 2 deletions internal/measurexlite/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ func (tx *Trace) FirstQUICHandshakeOrNil() *model.ArchivalTLSOrQUICHandshakeResu
return ev[0]
}

// MaybeCloseQUICConn is a convenience function for closing a quic.EarlyConnection only when such a conn
// isn't nil.
// MaybeCloseQUICConn is a convenience function for closing a [quic.EarlyConnection] when it is not nil.
func MaybeCloseQUICConn(conn quic.EarlyConnection) (err error) {
if conn != nil {
err = conn.CloseWithError(0, "")
Expand Down
Loading

0 comments on commit 7b88651

Please sign in to comment.