From 7b886518d7929b454135674e0bb8dcc9388b621b Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Thu, 29 Jun 2023 13:57:04 +0200 Subject: [PATCH] feat: lightweight throttling measurements (#1166) This diff implements a lightweight approach to throttling that takes advantage of the step-by-step design and should also be suitable to measure throttling using `dslx` (see https://github.com/ooni/probe/issues/2493). Before discussing the approach implemented here, it is important to point out that: 1. if we're using step-by-step, we're collecting up to 64 network events for a single network connection; 2. with step-by-step, each trace is bound to a single network connection or DNS round trip; 3. both Web Connectivity v0.5 and dslx use the step-by-step approach; 4. therefore, for extreme throttling of a single connection, 64 I/O events are *a lot of events* to observe throttling; 5. additionally, we're currently limited at downloading `1<<19` bytes of the body, so there is not much room for collecting *lots of data* anyway; 6. additionally, if we were to collect more bytes, the bottleneck would become collecting and uploading the HTTP response body to the OONI backend. That said, by exploiting the fact that step-by-step means that a trace is bound to a single network connection, we can add passive atomic collection of the bytes received by a trace. Because we're dealing with unconnected UDP sockets, we also need to be careful about accounting the bytes received from the peer that sent the bytes. To this end, we maintain a map from the remote endpoint address and protocol to the number of bytes received. The trace allows one to export the current map. Because data collection is passive, we can start as late as the HTTP download and we would still collect correct cumulative data. We also introduce a new sampler for measuring throttling. The design of the sampler is similar to the design we're using inside of ndt7. We use a memoryless ticker to avoid sampling periodically but we clamp the distribution such that we will typically receive the expected amount of samplers for each time period. It is also worth noting that I believe the already collected 64 network events are fine to determine throttling, but we cannot know for sure, hence it makes sense to improve our data collection capabilities. The related spec PR is https://github.com/ooni/spec/pull/276. Once this diff is merged, we would still need to do the following: - [ ] update dslx to use this functionality - [ ] land Web Connectivity LTE The latter is fundamental to collect speed samples. We're not doing that with Web Connectivity v0.4. While there, this diff also improves the measurexlite documentation a bit. --- .../webconnectivitylte/cleartextflow.go | 8 + .../webconnectivitylte/secureflow.go | 8 + internal/measurexlite/conn.go | 60 ++++++- internal/measurexlite/conn_test.go | 91 ++++++++++- internal/measurexlite/dialer.go | 4 - internal/measurexlite/dns.go | 5 +- internal/measurexlite/doc.go | 48 +++++- internal/measurexlite/quic.go | 3 +- internal/measurexlite/trace.go | 65 ++++---- internal/throttling/throttling.go | 150 +++++++++++++++++ internal/throttling/throttling_test.go | 153 ++++++++++++++++++ 11 files changed, 547 insertions(+), 48 deletions(-) create mode 100644 internal/throttling/throttling.go create mode 100644 internal/throttling/throttling_test.go diff --git a/internal/experiment/webconnectivitylte/cleartextflow.go b/internal/experiment/webconnectivitylte/cleartextflow.go index a7d5d2b760..fdb20e281b 100644 --- a/internal/experiment/webconnectivitylte/cleartextflow.go +++ b/internal/experiment/webconnectivitylte/cleartextflow.go @@ -19,6 +19,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/measurexlite" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/throttling" ) // Measures HTTP endpoints. @@ -98,6 +99,13 @@ func (t *CleartextFlow) Run(parentCtx context.Context, index int64) error { // create trace trace := measurexlite.NewTrace(index, t.ZeroTime) + // start measuring throttling + sampler := throttling.NewSampler(trace) + defer func() { + t.TestKeys.AppendNetworkEvents(sampler.ExtractSamples()...) + sampler.Close() + }() + // start the operation logger ol := measurexlite.NewOperationLogger( t.Logger, "[#%d] GET http://%s using %s", index, t.HostHeader, t.Address, diff --git a/internal/experiment/webconnectivitylte/secureflow.go b/internal/experiment/webconnectivitylte/secureflow.go index 99cd273c0b..a1079aae11 100644 --- a/internal/experiment/webconnectivitylte/secureflow.go +++ b/internal/experiment/webconnectivitylte/secureflow.go @@ -20,6 +20,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/measurexlite" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/throttling" ) // Measures HTTPS endpoints. @@ -105,6 +106,13 @@ func (t *SecureFlow) Run(parentCtx context.Context, index int64) error { // create trace trace := measurexlite.NewTrace(index, t.ZeroTime) + // start measuring throttling + sampler := throttling.NewSampler(trace) + defer func() { + t.TestKeys.AppendNetworkEvents(sampler.ExtractSamples()...) + sampler.Close() + }() + // start the operation logger ol := measurexlite.NewOperationLogger( t.Logger, "[#%d] GET https://%s using %s", index, t.HostHeader, t.Address, diff --git a/internal/measurexlite/conn.go b/internal/measurexlite/conn.go index 50fe0de49f..deb14293cf 100644 --- a/internal/measurexlite/conn.go +++ b/internal/measurexlite/conn.go @@ -5,6 +5,7 @@ package measurexlite // import ( + "fmt" "net" "time" @@ -12,7 +13,7 @@ import ( "github.com/ooni/probe-cli/v3/internal/netxlite" ) -// MaybeClose is a convenience function for closing a conn only when such a conn isn't nil. +// MaybeClose is a convenience function for closing a [net.Conn] when it is not nil. func MaybeClose(conn net.Conn) (err error) { if conn != nil { err = conn.Close() @@ -40,12 +41,15 @@ var _ net.Conn = &connTrace{} // Read implements net.Conn.Read and saves network events. func (c *connTrace) Read(b []byte) (int, error) { + // collect preliminary stats when the connection is surely active network := c.RemoteAddr().Network() addr := c.RemoteAddr().String() started := c.tx.TimeSince(c.tx.ZeroTime) + // perform the underlying network operation count, err := c.Conn.Read(b) + // emit the network event finished := c.tx.TimeSince(c.tx.ZeroTime) select { case c.tx.networkEvent <- NewArchivalNetworkEvent( @@ -54,9 +58,45 @@ func (c *connTrace) Read(b []byte) (int, error) { default: // buffer is full } + // update per receiver statistics + c.tx.updateBytesReceivedMapNetConn(network, addr, count) + + // return to the caller return count, err } +// updateBytesReceivedMapNetConn updates the [*Trace] bytes received map for a [net.Conn]. +func (tx *Trace) updateBytesReceivedMapNetConn(network, address string, count int) { + // normalize the network name + switch network { + case "udp", "udp4", "udp6": + network = "udp" + case "tcp", "tcp4", "tcp6": + network = "tcp" + } + + // create the key for inserting inside the map + key := fmt.Sprintf("%s %s", address, network) + + // lock and insert into the map + tx.bytesReceivedMu.Lock() + tx.bytesReceivedMap[key] += int64(count) + tx.bytesReceivedMu.Unlock() +} + +// CloneBytesReceivedMap returns a clone of the internal bytes received map. The key +// of the map is a string following the "EPNT_ADDRESS PROTO" pattern where the "EPNT_ADDRESS" +// contains the endpoint address and "PROTO" is "tcp" or "udp". +func (tx *Trace) CloneBytesReceivedMap() (out map[string]int64) { + out = make(map[string]int64) + tx.bytesReceivedMu.Lock() + for key, value := range tx.bytesReceivedMap { + out[key] = value + } + tx.bytesReceivedMu.Unlock() + return +} + // Write implements net.Conn.Write and saves network events. func (c *connTrace) Write(b []byte) (int, error) { network := c.RemoteAddr().Network() @@ -76,7 +116,7 @@ func (c *connTrace) Write(b []byte) (int, error) { return count, err } -// MaybeUDPLikeClose is a convenience function for closing a conn only when such a conn isn't nil. +// MaybeCloseUDPLikeConn is a convenience function for closing a [model.UDPLikeConn] when it is not nil. func MaybeCloseUDPLikeConn(conn model.UDPLikeConn) (err error) { if conn != nil { err = conn.Close() @@ -102,10 +142,13 @@ type udpLikeConnTrace struct { // Read implements model.UDPLikeConn.ReadFrom and saves network events. func (c *udpLikeConnTrace) ReadFrom(b []byte) (int, net.Addr, error) { + // record when we started measuring started := c.tx.TimeSince(c.tx.ZeroTime) + // perform the network operation count, addr, err := c.UDPLikeConn.ReadFrom(b) + // emit the network event finished := c.tx.TimeSince(c.tx.ZeroTime) address := addrStringIfNotNil(addr) select { @@ -115,9 +158,22 @@ func (c *udpLikeConnTrace) ReadFrom(b []byte) (int, net.Addr, error) { default: // buffer is full } + // possibly collect a download speed sample + c.tx.maybeUpdateBytesReceivedMapUDPLikeConn(addr, count) + + // return results to the caller return count, addr, err } +// maybeUpdateBytesReceivedMapUDPLikeConn updates the [*Trace] bytes received map for a [model.UDPLikeConn]. +func (tx *Trace) maybeUpdateBytesReceivedMapUDPLikeConn(addr net.Addr, count int) { + // Implementation note: the address may be nil if the operation failed given that we don't + // have a fixed peer address for UDP connections + if addr != nil { + tx.updateBytesReceivedMapNetConn(addr.Network(), addr.String(), count) + } +} + // Write implements model.UDPLikeConn.WriteTo and saves network events. func (c *udpLikeConnTrace) WriteTo(b []byte, addr net.Addr) (int, error) { started := c.tx.TimeSince(c.tx.ZeroTime) diff --git a/internal/measurexlite/conn_test.go b/internal/measurexlite/conn_test.go index ae6e195eb3..563ce31db8 100644 --- a/internal/measurexlite/conn_test.go +++ b/internal/measurexlite/conn_test.go @@ -103,6 +103,17 @@ func TestWrapNetConn(t *testing.T) { if err != nil { t.Fatal("invalid err") } + + t.Run("we update the trace's byte received map", func(t *testing.T) { + stats := trace.CloneBytesReceivedMap() + if len(stats) != 1 { + t.Fatal("expected to see just one entry") + } + if stats["1.1.1.1:443 tcp"] != 128 { + t.Fatal("expected to know we received 128 bytes") + } + }) + events := trace.NetworkEvents() if len(events) != 1 { t.Fatal("did not save network events") @@ -264,6 +275,9 @@ func TestWrapUDPLikeConn(t *testing.T) { MockString: func() string { return "1.1.1.1:443" }, + MockNetwork: func() string { + return "udp" + }, }, nil }, } @@ -284,6 +298,17 @@ func TestWrapUDPLikeConn(t *testing.T) { if err != nil { t.Fatal("invalid err") } + + t.Run("we update the trace's byte received map", func(t *testing.T) { + stats := trace.CloneBytesReceivedMap() + if len(stats) != 1 { + t.Fatal("expected to see just one entry") + } + if stats["1.1.1.1:443 udp"] != 128 { + t.Fatal("expected to know we received 128 bytes") + } + }) + events := trace.NetworkEvents() if len(events) != 1 { t.Fatal("did not save network events") @@ -310,6 +335,9 @@ func TestWrapUDPLikeConn(t *testing.T) { MockString: func() string { return "1.1.1.1:443" }, + MockNetwork: func() string { + return "udp" + }, }, nil }, } @@ -379,7 +407,7 @@ func TestWrapUDPLikeConn(t *testing.T) { } }) - t.Run("Write discards the event when the buffer is full", func(t *testing.T) { + t.Run("WriteTo discards the event when the buffer is full", func(t *testing.T) { underlying := &mocks.UDPLikeConn{ MockWriteTo: func(b []byte, addr net.Addr) (int, error) { return len(b), nil @@ -477,3 +505,64 @@ func TestNewAnnotationArchivalNetworkEvent(t *testing.T) { t.Fatal(diff) } } + +func TestTrace_updateBytesReceivedMapNetConn(t *testing.T) { + t.Run("we handle tcp4, tcp6, udp4 and udp6 like they were tcp and udp", func(t *testing.T) { + // create a new trace + tx := NewTrace(0, time.Now()) + + // insert stats for tcp, tcp4 and tcp6 + tx.updateBytesReceivedMapNetConn("tcp", "1.2.3.4:5678", 10) + tx.updateBytesReceivedMapNetConn("tcp4", "1.2.3.4:5678", 100) + tx.updateBytesReceivedMapNetConn("tcp", "[::1]:5678", 10) + tx.updateBytesReceivedMapNetConn("tcp6", "[::1]:5678", 100) + + // insert stats for udp, udp4 and udp6 + tx.updateBytesReceivedMapNetConn("udp", "1.2.3.4:5678", 10) + tx.updateBytesReceivedMapNetConn("udp4", "1.2.3.4:5678", 100) + tx.updateBytesReceivedMapNetConn("udp", "[::1]:5678", 10) + tx.updateBytesReceivedMapNetConn("udp6", "[::1]:5678", 100) + + // make sure the result is the expected one + expected := map[string]int64{ + "1.2.3.4:5678 tcp": 110, + "[::1]:5678 tcp": 110, + "1.2.3.4:5678 udp": 110, + "[::1]:5678 udp": 110, + } + got := tx.CloneBytesReceivedMap() + if diff := cmp.Diff(expected, got); diff != "" { + t.Fatal(diff) + } + }) +} + +func TestTrace_maybeUpdateBytesReceivedMapUDPLikeConn(t *testing.T) { + t.Run("we ignore cases where the address is nil", func(t *testing.T) { + // create a new trace + tx := NewTrace(0, time.Now()) + + // insert stats with a nil address + tx.maybeUpdateBytesReceivedMapUDPLikeConn(nil, 128) + + // inserts stats with a good address + goodAddr := &mocks.Addr{ + MockString: func() string { + return "1.2.3.4:5678" + }, + MockNetwork: func() string { + return "udp" + }, + } + tx.maybeUpdateBytesReceivedMapUDPLikeConn(goodAddr, 128) + + // make sure the result is the expected one + expected := map[string]int64{ + "1.2.3.4:5678 udp": 128, + } + got := tx.CloneBytesReceivedMap() + if diff := cmp.Diff(expected, got); diff != "" { + t.Fatal(diff) + } + }) +} diff --git a/internal/measurexlite/dialer.go b/internal/measurexlite/dialer.go index fae576bc66..32b9cdb194 100644 --- a/internal/measurexlite/dialer.go +++ b/internal/measurexlite/dialer.go @@ -18,10 +18,6 @@ import ( // NewDialerWithoutResolver is equivalent to netxlite.NewDialerWithoutResolver // except that it returns a model.Dialer that uses this trace. -// -// Note: unlike code in netx or measurex, this factory DOES NOT return you a -// dialer that also performs wrapping of a net.Conn in case of success. If you -// want to wrap the conn, you need to wrap it explicitly using model.Trace.WrapNetConn. func (tx *Trace) NewDialerWithoutResolver(dl model.DebugLogger) model.Dialer { return &dialerTrace{ d: tx.newDialerWithoutResolver(dl), diff --git a/internal/measurexlite/dns.go b/internal/measurexlite/dns.go index 514a6e36d9..076166565a 100644 --- a/internal/measurexlite/dns.go +++ b/internal/measurexlite/dns.go @@ -267,8 +267,9 @@ func (tx *Trace) FirstDNSLookup() *model.ArchivalDNSLookupResult { return ev[0] } -// ErrDelayedDNSResponseBufferFull indicates that the delayedDNSResponse buffer is full. -var ErrDelayedDNSResponseBufferFull = errors.New("buffer full") +// ErrDelayedDNSResponseBufferFull indicates that the delayed-DNS-response channel buffer is full. +var ErrDelayedDNSResponseBufferFull = errors.New( + "measurexlite: the delayed-DNS-response channel buffer is full") // OnDelayedDNSResponse implements model.Trace.OnDelayedDNSResponse func (tx *Trace) OnDelayedDNSResponse(started time.Time, txp model.DNSTransport, query model.DNSQuery, diff --git a/internal/measurexlite/doc.go b/internal/measurexlite/doc.go index cc52f8437f..8a5e717727 100644 --- a/internal/measurexlite/doc.go +++ b/internal/measurexlite/doc.go @@ -1,10 +1,44 @@ -// Package measurexlite contains measurement extensions. +// Package measurexlite contains measurement extensions. This package is named "measurex lite" +// because it implements a lightweight approach compared to a previous package named "measurex". // -// See docs/design/dd-003-step-by-step.md in the ooni/probe-cli -// repository for the design document. +// [measurexlite] implements the [dd-003-step-by-step.md] design document. The fundamental data type +// is the [*Trace], which saves events in buffered channels. The [NewTrace] constructor creates +// channels with sufficient capacity for tracing all the events we expect to see for a single use +// connection or for a DNS round trip. If you are not draining the channels, the [*Trace] will +// eventually stop collecting events, though. // -// This implementation features a Trace that saves events in -// buffered channels as proposed by df-003-step-by-step.md. We -// have reasonable default buffers for channels. But, if you -// are not draining them, eventually we stop collecting events. +// As mentioned above, the expectation is that a [*Trace] will only trace a single use connection or +// a DNS round trip. Typically, you create a distinct trace for each TCP-TLS-HTTP or TCP-HTTP or +// QUIC-HTTP or DNS-lookup-with-getaddrinfo or DNS-lookup-with-UDP sequence of operations. There is +// a "trace ID" for each trace, which you provide to [NewTrace]. This ID is copied into the +// "transaction_id" field of the archival network events. Therefore, by using distinct trace IDs +// for distinct operations, you enable [ooni/data] to group related events together. +// +// The [*Trace] features methods that mirror existing [netxlite] methods but implement support for +// collecting network events using the [*Trace]. For example, [*Trace.NewStdlibResolver] is like +// [netxlite.NewStdlibResolver] but the DNS lookups performed with the resolved returned by +// [*Trace.NewStdlibResolver] generate events that you can collect using the [*Trace]. +// +// As mentioned above, internally, the [*Trace] uses buffered channels on which the underlying +// network objects attempt to write when there is an interesting event. As a user of the +// [measurexlite] package, you have methods to extract the events from the [*Trace] channels, +// such as, for example: +// +// - [*Trace.DNSLookupsFromRoundTrip] +// +// - [*Trace.NetworkEvents] +// +// - [*Trace.TCPConnects] +// +// - [*Trace.QUICHandshakes] +// +// - [*Trace.TLSHandshakes] +// +// These methods already return data structures using the archival data format implemented +// by the [model] package and specified in the [ooni/spec] repository. Hence, these structures +// are ready to be added to OONI measurements. +// +// [dd-003-step-by-step.md]: https://github.com/ooni/probe-cli/blob/master/docs/design/dd-003-step-by-step.md +// [ooni/data]: https://github.com/ooni/data +// [ooni/spec]: https://github.com/ooni/spec package measurexlite diff --git a/internal/measurexlite/quic.go b/internal/measurexlite/quic.go index bbec9758a1..d69760be9c 100644 --- a/internal/measurexlite/quic.go +++ b/internal/measurexlite/quic.go @@ -107,8 +107,7 @@ func (tx *Trace) FirstQUICHandshakeOrNil() *model.ArchivalTLSOrQUICHandshakeResu return ev[0] } -// MaybeCloseQUICConn is a convenience function for closing a quic.EarlyConnection only when such a conn -// isn't nil. +// MaybeCloseQUICConn is a convenience function for closing a [quic.EarlyConnection] when it is not nil. func MaybeCloseQUICConn(conn quic.EarlyConnection) (err error) { if conn != nil { err = conn.CloseWithError(0, "") diff --git a/internal/measurexlite/trace.go b/internal/measurexlite/trace.go index 8ab0bddbbe..7f69eed017 100644 --- a/internal/measurexlite/trace.go +++ b/internal/measurexlite/trace.go @@ -5,6 +5,7 @@ package measurexlite // import ( + "sync" "time" "github.com/ooni/probe-cli/v3/internal/model" @@ -12,23 +13,33 @@ import ( utls "gitlab.com/yawning/utls.git" ) -// Trace implements model.Trace. +// Trace implements [model.Trace]. We use a [context.Context] to register ourselves +// as the [model.Trace] and we implement the [model.Trace] callbacks to route events +// into internal buffered channels as explained in by the [measurexlite] package +// documentation. The zero-value of this struct is invalid. To construct use [NewTrace]. // -// The zero-value of this struct is invalid. To construct use NewTrace. -// -// # Buffered channels -// -// NewTrace uses reasonable buffer sizes for the channels used for collecting +// [NewTrace] uses reasonable buffer sizes for the channels used for collecting // events. You should drain the channels used by this implementation after -// each operation you perform (i.e., we expect you to peform step-by-step -// measurements). We have convenience methods for extracting events from the -// buffered channels. +// each operation you perform (that is, we expect you to peform [step-by-step +// measurements]). As mentioned in the [measurexlite] package documentation, +// there are several methods for extracting events from the [*Trace]. +// +// [step-by-step measurements]: https://github.com/ooni/probe-cli/blob/master/docs/design/dd-003-step-by-step.md type Trace struct { // Index is the unique index of this trace within the // current measurement. Note that this field MUST be read-only. Writing it // once you have constructed a trace MAY lead to data races. Index int64 + // bytesReceivedMap maps a remote host with the bytes we received + // from such a remote host. Accessing this map requires one to + // additionally hold the bytesReceivedMu mutex. + bytesReceivedMap map[string]int64 + + // bytesReceivedMu protects the bytesReceivedMap from concurrent + // access from multiple goroutines. + bytesReceivedMu *sync.Mutex + // networkEvent is MANDATORY and buffers network events. networkEvent chan *model.ArchivalNetworkEvent @@ -88,31 +99,23 @@ type Trace struct { ZeroTime time.Time } -const ( - // NetworkEventBufferSize is the buffer size for constructing - // the internal Trace's networkEvent buffered channel. - NetworkEventBufferSize = 64 +// NetworkEventBufferSize is the [*Trace] buffer size for network I/O events. +const NetworkEventBufferSize = 64 - // DNSLookupBufferSize is the buffer size for constructing - // the internal Trace's dnsLookup buffered channel. - DNSLookupBufferSize = 8 +// DNSLookupBufferSize is the [*Trace] buffer size for DNS lookup events. +const DNSLookupBufferSize = 8 - // DNSResponseBufferSize is the buffer size for constructing - // the internal Trace's dnsDelayedResponse buffered channel. - DelayedDNSResponseBufferSize = 8 +// DNSResponseBufferSize is the [*Trace] buffer size for delayed DNS responses events. +const DelayedDNSResponseBufferSize = 8 - // TCPConnectBufferSize is the buffer size for constructing - // the internal Trace's tcpConnect buffered channel. - TCPConnectBufferSize = 8 +// TCPConnectBufferSize is the [*Trace] buffer size for TCP connect events. +const TCPConnectBufferSize = 8 - // TLSHandshakeBufferSize is the buffer for construcing - // the internal Trace's tlsHandshake buffered channel. - TLSHandshakeBufferSize = 8 +// TLSHandshakeBufferSize is the [*Trace] buffer size for TLS handshake events. +const TLSHandshakeBufferSize = 8 - // QUICHandshakeBufferSize is the buffer for constructing - // the internal Trace's quicHandshake buffered channel. - QUICHandshakeBufferSize = 8 -) +// QUICHandshakeBufferSize is the [*Trace] buffer size for QUIC handshake events. +const QUICHandshakeBufferSize = 8 // NewTrace creates a new instance of Trace using default settings. // @@ -130,7 +133,9 @@ const ( // to identify that some traces belong to some submeasurements). func NewTrace(index int64, zeroTime time.Time, tags ...string) *Trace { return &Trace{ - Index: index, + Index: index, + bytesReceivedMap: make(map[string]int64), + bytesReceivedMu: &sync.Mutex{}, networkEvent: make( chan *model.ArchivalNetworkEvent, NetworkEventBufferSize, diff --git a/internal/throttling/throttling.go b/internal/throttling/throttling.go new file mode 100644 index 0000000000..46252ed33f --- /dev/null +++ b/internal/throttling/throttling.go @@ -0,0 +1,150 @@ +// Package throttling wraps connections to measure throttling. +package throttling + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/memoryless" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +// Sampler periodically samples the bytes sent and received by a [*measurexlite.Trace]. The zero +// value of this structure is invalid; please, construct using [NewSampler]. +type Sampler struct { + // cancel tells the background goroutine to stop + cancel context.CancelFunc + + // mu provides mutual exclusion + mu *sync.Mutex + + // once ensures that close has "once" semantics + once *sync.Once + + // q is the queue of events we are collecting + q []*model.ArchivalNetworkEvent + + // tx is the trace we are sampling from + tx *measurexlite.Trace + + // wg is the waitgroup to wait for the sampler to join + wg *sync.WaitGroup +} + +// NewSampler attaches a [*Sampler] to a [*measurexlite.Trace], starts sampling in the +// background and returns the [*Sampler]. Remember to call [*Sampler.Close] to stop +// the background goroutine that performs the sampling. +func NewSampler(tx *measurexlite.Trace) *Sampler { + ctx, cancel := context.WithCancel(context.Background()) + smpl := &Sampler{ + cancel: cancel, + mu: &sync.Mutex{}, + once: &sync.Once{}, + q: []*model.ArchivalNetworkEvent{}, + tx: tx, + wg: &sync.WaitGroup{}, + } + smpl.wg.Add(1) + go smpl.mainLoop(ctx) + return smpl +} + +func (smpl *Sampler) mainLoop(ctx context.Context) { + // make sure the parent knows when we're done running + defer smpl.wg.Done() + + // From the memoryless documentation: + // + // The exact mathematical meaning of "too extreme" depends on your situation, + // but a nice rule of thumb is config.Min should be at most 10% of expected and + // config.Max should be at least 250% of expected. These values mean that less + // than 10% of time you will be waiting config.Min and less than 10% of the time + // you will be waiting config.Max. + // + // So, we are going to use 250 milliseconds of expected, 25 milliseconds for the + // minimum value, and 650 milliseconds for the maximum value. + config := memoryless.Config{ + Expected: 250 * time.Millisecond, + Min: 25 * time.Millisecond, + Max: 650 * time.Millisecond, + Once: false, + } + + // create the memoryless ticker + ticker := runtimex.Try1(memoryless.NewTicker(ctx, config)) + defer ticker.Stop() + + // loop until we're asked to stop through the context + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + smpl.collectSnapshot(smpl.tx.CloneBytesReceivedMap()) + } + } +} + +// BytesReceivedCumulativeOperation is the operation we set for network events. +const BytesReceivedCumulativeOperation = "bytes_received_cumulative" + +func (smpl *Sampler) collectSnapshot(stats map[string]int64) { + // compute just once the events sampling time + now := smpl.tx.TimeSince(smpl.tx.ZeroTime).Seconds() + + // process each entry + for key, count := range stats { + // extract the network and the address from the map key + // note: the format is "EPNT_ADDRESS NETWORK" + vector := strings.Split(key, " ") + if len(vector) != 2 { + continue + } + address, network := vector[0], vector[1] + + // fill the event + ev := &model.ArchivalNetworkEvent{ + Address: address, + Failure: nil, + NumBytes: count, + Operation: BytesReceivedCumulativeOperation, + Proto: network, + T0: now, + T: now, + TransactionID: smpl.tx.Index, + Tags: smpl.tx.Tags(), + } + + // lock and insert + smpl.mu.Lock() + smpl.q = append(smpl.q, ev) + smpl.mu.Unlock() + } +} + +// Close closes the [*Sampler]. This method is goroutine safe and idempotent. +func (smpl *Sampler) Close() error { + smpl.once.Do(func() { + smpl.cancel() + smpl.wg.Wait() + }) + return nil +} + +// ExtractSamples extracts the samples from the [*Sampler] +func (smpl *Sampler) ExtractSamples() []*model.ArchivalNetworkEvent { + // collect one last sample -- no need to lock since collectSnapshot locks the mutex + smpl.collectSnapshot(smpl.tx.CloneBytesReceivedMap()) + + // lock and extract all samples + smpl.mu.Lock() + o := smpl.q + smpl.q = []*model.ArchivalNetworkEvent{} + smpl.mu.Unlock() + return o +} diff --git a/internal/throttling/throttling_test.go b/internal/throttling/throttling_test.go new file mode 100644 index 0000000000..64cab6a50e --- /dev/null +++ b/internal/throttling/throttling_test.go @@ -0,0 +1,153 @@ +package throttling + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/ooni/probe-cli/v3/internal/measurexlite" + "github.com/ooni/probe-cli/v3/internal/model" + "github.com/ooni/probe-cli/v3/internal/netxlite" + "github.com/ooni/probe-cli/v3/internal/randx" + "github.com/ooni/probe-cli/v3/internal/runtimex" +) + +func TestSamplerWorkingAsIntended(t *testing.T) { + const ( + chunkSize = 1 << 14 + repetitions = 10 + totalBody = repetitions * chunkSize + traceID = 14 + ) + + // create a testing server that sleeps after each sender for a given number of sends + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + chunk := []byte(randx.Letters(chunkSize)) + for idx := 0; idx < repetitions; idx++ { + w.Write(chunk) + time.Sleep(250 * time.Millisecond) + } + })) + defer server.Close() + + // create a trace + expectedTags := []string{"antani", "mascetti"} + tx := measurexlite.NewTrace(traceID, time.Now(), expectedTags...) + + // create a sampler for the trace + sampler := NewSampler(tx) + defer sampler.Close() + + // create a dialer + dialer := tx.NewDialerWithoutResolver(model.DiscardLogger) + + // create an HTTP transport + txp := netxlite.NewHTTPTransport(model.DiscardLogger, dialer, netxlite.NewNullTLSDialer()) + + // create the HTTP request to issue + req := runtimex.Try1(http.NewRequest("GET", server.URL, nil)) + + // issue the HTTP request and await for response + resp, err := txp.RoundTrip(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + t.Log("got response", resp) + + // read the response body + body, err := netxlite.ReadAllContext(req.Context(), resp.Body) + if err != nil { + t.Fatal(err) + } + + // make sure we've read the body + if len(body) != totalBody { + t.Fatal("expected", totalBody, "bytes but got", len(body), "bytes") + } + + // make sure we have events to process + events := sampler.ExtractSamples() + if len(events) <= 0 { + t.Fatal("expected to see at least one event") + } + + // make sure each event looks good + var ( + previousCounter int64 + previousT float64 + ) + for _, ev := range events { + t.Log(ev) + + // Make sure the address is the remote server address. + if ev.Address != server.Listener.Addr().String() { + t.Fatal("invalid address", ev.Address) + } + + // There is no failure for this kind of events because we only collect statistics. + if ev.Failure != nil { + t.Fatal("invalid failure", ev.Failure) + } + + // The number of bytes received should increase monotonically + if ev.NumBytes < previousCounter { + t.Fatal("non-monotonic bytes increase", ev.NumBytes, previousCounter) + } + previousCounter = ev.NumBytes + + // The operation should always be the expected one + if ev.Operation != BytesReceivedCumulativeOperation { + t.Fatal("invalid operation", ev.Operation) + } + + // Make sure the protocol is the expected one + if ev.Proto != "tcp" { + t.Fatal("invalid proto", ev.Proto) + } + + // The time should also increase monotonically. It may be possible for this test + // to sometimes fail in cloud environments, based on other tests we have seen failing. + if ev.T != ev.T0 { + t.Fatal("T and T0 should be equal", ev.T, ev.T0) + } + if ev.T < previousT { + t.Fatal("non-monotonic time increase", ev.T, previousT) + } + previousT = ev.T + + // Make sure the trace ID is the expected one + if ev.TransactionID != traceID { + t.Fatal("unexpected transaction ID", ev.TransactionID, traceID) + } + + // Make sure the tags are the ones we expect to see + if diff := cmp.Diff(expectedTags, ev.Tags); diff != "" { + t.Fatal(diff) + } + } +} + +func TestSampleSkipsInvalidMapEntries(t *testing.T) { + // create a trace and a sampler + tx := measurexlite.NewTrace(0, time.Now()) + sampler := NewSampler(tx) + + // create a fake map with an invalid entry and submit it + stats := map[string]int64{ + "1.1.1.1:443": 128, // this entry is INVALID because it's missing the protocol + "1.1.1.1:443/tcp": 44, // INVALID because there's no space separator + } + + // update the stats + sampler.collectSnapshot(stats) + + // obtain the network events + ev := sampler.ExtractSamples() + if len(ev) != 0 { + t.Fatal("expected to see no events here") + } +}